-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15784][ML]:Add Power Iteration Clustering to spark.ml #15770
Conversation
Test build #68148 has finished for PR 15770 at commit
|
Test build #68150 has finished for PR 15770 at commit
|
ping @yanboliang |
@wangmiao1981 Thanks for working on this. Since we start QA for 2.1, I will review this after the release. |
Test build #71020 has finished for PR 15770 at commit
|
@Since("2.2.0") | ||
final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " + | ||
"Supported options: 'random' and 'degree'.", | ||
(value: String) => validateInitMode(value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about use validator ParamValidators.inArray[String](...)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the difference? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do not need to use write a function as you do below after that, it will allow more user-friendly error messages in the future.
Test build #72313 has finished for PR 15770 at commit
|
|
||
import org.apache.spark.annotation.{Experimental, Since} | ||
import org.apache.spark.ml.Transformer | ||
import org.apache.spark.ml.linalg.{Vector} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for brackets
import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.{DataFrame, Dataset, Row} | ||
import org.apache.spark.sql.functions.{col} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
"Supported options: 'random' and 'degree'.", | ||
(value: String) => validateInitMode(value)) | ||
|
||
private[spark] def validateInitMode(initMode: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need with comment above
@Since("2.2.0") | ||
@Experimental | ||
class PowerIterationClustering private[clustering] ( | ||
@Since("2.2.0") override val uid: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
* Validates the input schema | ||
* @param schema input schema | ||
*/ | ||
protected def validateSchema(schema: StructType): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of just validating the schema, we should validate and transform. You can follow the example in https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala#L92
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Also:
- This should check other input columns to make sure they are defined.
- This should add predictionCol, not check that it exists in the input.
* Default: "id" | ||
* @group param | ||
*/ | ||
val idCol = new Param[String](this, "idCol", "column name for ids.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making an 'id' column, which does not convey much information, we should follow the example of K-Means
and call it prediction
. You already include the trait for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the MLLIB implementation, the clustering result is case class Assignment(id: Long, cluster: Int)
. idCol
is the node id and the cluster
is the cluster id that this node belongs to. I think id
is still useful to represent the node. Otherwise, we need to make sure that the output order of nodes is the same as input order, which means id
is implicitly inferred from the Row Number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PIC is different compared with K-Means. K-Means transform
applies predict
method to each row of the input (i.e., each data point). While PIC run
is applying the K-Means to the pseudo-eigenvector from powerIter
method. This is not one-to-one map from Input dataset to result. Please also see the comments below.
def setIdCol(value: String): this.type = set(idCol, value) | ||
|
||
@Since("2.2.0") | ||
override def transform(dataset: Dataset[_]): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue with the current implementation is that extra columns are going to be discarded (this is also the case with most graphframes algorithms), which breaks the general contract of transformers. After making the transform, you should join back on the missing columns.
Also, a test for this use case would be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The missing column is the featureCol
. You want the featureCol
is in the result DataFrame
. Right? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thunterdb The prediction/transformation result (id, cluster)
has different size of the features
. It is because id
denotes the node id in the graph, which is equal to the number of nodes in the graph. features
are the graph weights and |features|
is roughly |nodes|^2/2
. So I can't directly join the result dataframe with the input dataset. Please correct me if I misunderstood your comments. Thanks!
} | ||
} | ||
|
||
test("power iteration clustering") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also add a test with a dataframe that has some extra data in it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add it.
@thunterdb Thanks for your review! I will address the comments soon. |
@wangmiao1981 thanks a lot! I would be very happy to see that PR in Spark 2.2 and I will gladly help you for that. |
Test build #73079 has finished for PR 15770 at commit
|
You are right, I had forgotten that for this algorithm, the input is the edges, and the output is the label for each of the vertices. This is a tricky algorithm to put as a transformer, since it does not follow the usual convention that data should only be appended to the dataframe. I suggest we follow the same example as ALS the mllib implementation of PIC:
If we do not follow this pattern, then the model selection algorithms are not going to work. What do you think? |
@thunterdb Thanks for your response. In the original JIRA, we have discussed why we want it to be a transformer. Let me find it and post it here. |
Joseph K. Bradley added a comment - 31/Oct/16 18:14 Miao Wang Sorry for the slow response here. I do want us to add PIC to spark.ml, but we should discuss the design before the PR. Could you please close the PR for now but save the branch to re-open after discussion? Let's have a design discussion first. I agree that the big issue is that there isn't a clear way to make predictions on new data points. In fact, I've never heard of people trying to do so. Has anyone else? Assuming that prediction is not meaningful for PIC, then I don't think the algorithm fits within the Pipeline framework, though it's debatable. I see a few options:
My top choice is PIC as a Transformer. What do you think? CC Yanbo Liang Seth Hendrickson Nick Pentreath opinions? This seems like it fits the framework of a feature transformer. We could generate a real-valued feature column using PIC algorithm where the values are just the components of the pseudo-eigenvector. Alternatively we could pipeline a KMeans clustering on the end, but I think it makes more sense to let users do that themselves - but that's up for debate. |
Yanbo Liang added a comment - 02/Nov/16 09:30 - edited I'm prefer to #1 and #3, but it looks like we can achieve both goals.
I think this scenario should be better for different users(ML users and GraphFrames users), but still open to hear your thoughts. Thanks. |
I am checking ALS out to understand your suggestions. Thanks! |
Test build #73238 has finished for PR 15770 at commit
|
@wangmiao1981 yes I had seen the discussions there. I believe that eventually PIC should be moved into graphframes, but we can have a simple API in |
@thunterdb Per discussion with Yanbo, there is one concern of making it an Estimator. For every |
@thunterdb I have two concerns about making PIC as an Estimator:
But in your suggestion, the input of PIC estimator is the edges or RDD[(Long, Long, Double)], the input of PIC model is the vertices ID. I totally agree making PIC as a Transformer is tricky, but I did not find a better way. Thanks. |
ping @WeichenXu123 |
@wangmiao1981 Sorry for delay, I will take a look later, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I concern about the case of high centrality graph mentioned by @thunterdb .
And it seems we need a helper function, converting Graph structure into adjacency list format dataframe. so that user can use this transformer conveniently ?
val sparkSession = dataset.sparkSession | ||
val rdd: RDD[(Long, Long, Double)] = | ||
dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap { | ||
case Row(id: Long, nbr: Vector, weight: Vector) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PIC require input graph matrix to be symmetric, and the weight should be non-negative. It is better to check them here. But checking symmetric seems cost too much, I have no good idea for now. cc @jkbradley Do you have some thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think checking symmetric is too much for PIC in this data format. Maybe, we can omit the check and put a comment and INFO on console to let users take care of it. @WeichenXu123
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about not checking for symmetry as long as we document it.
But I do have one suggestion: Let's take neighbors and weights as Arrays, not Vectors. That may help prevent users from mistakenly passing in feature Vectors.
require(nbr.size == weight.size, | ||
"The length of neighbor list must be equal to the the length of the weight list.") | ||
val ids = Array.fill(nbr.size)(id) | ||
for (i <- 0 until ids.size) yield (ids(i), nbr(i).toLong, weight(i))} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line code do not performs well, why you expand the var id
into a array filled with the same value ?
You can use:
nbr.toArray.toIterator.zip(weight.toArray.toIterator).map(x => (id, x._1.toLong, x._2.toLong))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember last time you mentioned not using zip. I might get it wrong. By the way, it seems we need a helper function
, do you want a helper function takes GraphFrame and returns the RDD used by PIC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I don't like Array.zip
because it generate a temp array But Array.toIterator.zip
do not have this problem, it is iterator zip.
I will address the review comments soon. Thanks! @WeichenXu123 |
Test build #83064 has finished for PR 15770 at commit
|
Test build #83230 has finished for PR 15770 at commit
|
@WeichenXu123 , for the graph helper, the Mllib has a version takes |
@wangmiao1981 oh, not a big deal, what I thought is that, user is possible to use |
@WeichenXu123 Thanks for your review and reply! I agree with you that the helper can be discussed later for potential enhancement. |
@WeichenXu123 Any other comments? Thanks! |
LGTM. ping @yanboliang |
ping @yanboliang |
1 similar comment
ping @yanboliang |
Just pinged @yanboliang on JIRA about me taking over shepherding this. It will need at least one update: change Since versions from 2.3.0 to 2.4.0. Sorry for the long wait @wangmiao1981 : ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just reviewed this. Overall, it looks good. In addition to some comments here, I had some comments about the text in docstrings. For that text, I figured it'd be easiest to send a PR to your PR here: wangmiao1981#4
It'd be nice to add a test for invalid input:
- a negative weight
- mismatched lengths for neighbors & weights in one row
- (not symmetry since we don't check that)
Thanks @wangmiao1981 !
val sparkSession = dataset.sparkSession | ||
val rdd: RDD[(Long, Long, Double)] = | ||
dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap { | ||
case Row(id: Long, nbr: Vector, weight: Vector) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about not checking for symmetry as long as we document it.
But I do have one suggestion: Let's take neighbors and weights as Arrays, not Vectors. That may help prevent users from mistakenly passing in feature Vectors.
@Since("2.3.0") override val uid: String) | ||
extends Transformer with PowerIterationClusteringParams with DefaultParamsWritable { | ||
|
||
setDefault( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It'd be nice to put these defaults right next the Param definitions in PowerIterationClusteringParams so that the default specified in the docstring is close to the default specified by setDefault (to make sure they stay in sync).
* Common params for PowerIterationClustering | ||
*/ | ||
private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter | ||
with HasFeaturesCol with HasPredictionCol with HasWeightCol { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not use weightCol, which is for instance weights, not for this kind of adjacency. Let's add a new Param here, perhaps called neighborWeightCol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, featuresCol is not used, so it should be removed.
* Validates the input schema | ||
* @param schema input schema | ||
*/ | ||
protected def validateSchema(schema: StructType): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Also:
- This should check other input columns to make sure they are defined.
- This should add predictionCol, not check that it exists in the input.
def getNeighborCol: String = $(neighborCol) | ||
|
||
/** | ||
* Validates the input schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: No need for doc like this which is explained by the method title
override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) | ||
|
||
@Since("2.3.0") | ||
def this() = this(Identifiable.randomUID("PowerIterationClustering")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Put constructors first, before other methods (like copy), to match the style of the rest of MLlib.
case assignment: Assignment => Row(assignment.id, assignment.cluster) | ||
} | ||
|
||
val schema = transformSchema(new StructType(Array(StructField($(idCol), LongType), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not need to explicitly create a schema here.
with MLlibTestSparkContext with DefaultReadWriteTest { | ||
|
||
@transient var data: Dataset[_] = _ | ||
@transient var malData: Dataset[_] = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used
} | ||
assert(predictions2.toSet == Set((1 until n1).toSet, (n1 until n).toSet)) | ||
|
||
val expectedColumns = Array("id", "prediction") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check this since it's already checks above by result2.select(...)
@wangmiao1981 Do let me know if you're too busy now to resume this; I know it's been a long time. Thanks! |
@wangmiao1981 If you're busy I can help take over this. -:) |
I don't mind; I'll take it. But I'll mark @wangmiao1981 as the main contributor for the PR. Would you mind closing this issue @wangmiao1981 and I'll reopen a new PR under the same JIRA? |
@jkbradley Sorry for missing your comments. Anyway, I will close it now. I will choose another one to work on. Thanks! |
@jkbradley I close this one now. Thanks! |
OK sorry to push @wangmiao1981 ! I just want to make sure this gets in before I no longer have bandwidth for it. If you have the time, would you mind checking the updates I made in the new PR? Thanks! |
## What changes were proposed in this pull request? This PR adds PowerIterationClustering as a Transformer to spark.ml. In the transform method, it calls spark.mllib's PowerIterationClustering.run() method and transforms the return value assignments (the Kmeans output of the pseudo-eigenvector) as a DataFrame (id: LongType, cluster: IntegerType). This PR is copied and modified from apache#15770 The primary author is wangmiao1981 ## How was this patch tested? This PR has 2 types of tests: * Copies of tests from spark.mllib's PIC tests * New tests specific to the spark.ml APIs Author: [email protected] <[email protected]> Author: wangmiao1981 <[email protected]> Author: Joseph K. Bradley <[email protected]> Closes apache#21090 from jkbradley/wangmiao1981-pic.
What changes were proposed in this pull request?
As we discssed in the JIRA,
PowerIterationClustering
is added as aTransformer
. ThefeatureCol
isvector
type. In thetransform
method, it callsMLlibPowerIterationClustering().run(rdd)
method and transforms the return valueassignments
(the Kmeans output of the pseudo-eigenvector) as a Dataframe (id
:LongType
,cluster
:IntegerType
).How was this patch tested?
Add new unit tests similar to
MLlibPowerIterationClustering
.