Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15784][ML]:Add Power Iteration Clustering to spark.ml #15770

Closed
wants to merge 28 commits into from

Conversation

wangmiao1981
Copy link
Contributor

@wangmiao1981 wangmiao1981 commented Nov 4, 2016

What changes were proposed in this pull request?

As we discssed in the JIRA, PowerIterationClustering is added as a Transformer. The featureCol is vector type. In the transform method, it calls MLlibPowerIterationClustering().run(rdd) method and transforms the return value assignments (the Kmeans output of the pseudo-eigenvector) as a Dataframe (id: LongType, cluster: IntegerType).

How was this patch tested?

Add new unit tests similar to MLlibPowerIterationClustering.

@SparkQA
Copy link

SparkQA commented Nov 4, 2016

Test build #68148 has finished for PR 15770 at commit 8dd3ca2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 4, 2016

Test build #68150 has finished for PR 15770 at commit e29a73e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangmiao1981
Copy link
Contributor Author

ping @yanboliang

@yanboliang
Copy link
Contributor

@wangmiao1981 Thanks for working on this. Since we start QA for 2.1, I will review this after the release.

@SparkQA
Copy link

SparkQA commented Jan 7, 2017

Test build #71020 has finished for PR 15770 at commit 754112f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Since("2.2.0")
final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " +
"Supported options: 'random' and 'degree'.",
(value: String) => validateInitMode(value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about use validator ParamValidators.inArray[String](...) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need to use write a function as you do below after that, it will allow more user-friendly error messages in the future.

@SparkQA
Copy link

SparkQA commented Feb 3, 2017

Test build #72313 has finished for PR 15770 at commit d40a6d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.{Vector}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for brackets

import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

"Supported options: 'random' and 'degree'.",
(value: String) => validateInitMode(value))

private[spark] def validateInitMode(initMode: String): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need with comment above

@Since("2.2.0")
@Experimental
class PowerIterationClustering private[clustering] (
@Since("2.2.0") override val uid: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

* Validates the input schema
* @param schema input schema
*/
protected def validateSchema(schema: StructType): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of just validating the schema, we should validate and transform. You can follow the example in https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala#L92

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Also:

  • This should check other input columns to make sure they are defined.
  • This should add predictionCol, not check that it exists in the input.

* Default: "id"
* @group param
*/
val idCol = new Param[String](this, "idCol", "column name for ids.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making an 'id' column, which does not convey much information, we should follow the example of K-Means and call it prediction. You already include the trait for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the MLLIB implementation, the clustering result is case class Assignment(id: Long, cluster: Int). idCol is the node id and the cluster is the cluster id that this node belongs to. I think id is still useful to represent the node. Otherwise, we need to make sure that the output order of nodes is the same as input order, which means id is implicitly inferred from the Row Number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PIC is different compared with K-Means. K-Means transform applies predict method to each row of the input (i.e., each data point). While PIC run is applying the K-Means to the pseudo-eigenvector from powerIter method. This is not one-to-one map from Input dataset to result. Please also see the comments below.

def setIdCol(value: String): this.type = set(idCol, value)

@Since("2.2.0")
override def transform(dataset: Dataset[_]): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with the current implementation is that extra columns are going to be discarded (this is also the case with most graphframes algorithms), which breaks the general contract of transformers. After making the transform, you should join back on the missing columns.
Also, a test for this use case would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The missing column is the featureCol. You want the featureCol is in the result DataFrame. Right? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thunterdb The prediction/transformation result (id, cluster) has different size of the features. It is because id denotes the node id in the graph, which is equal to the number of nodes in the graph. features are the graph weights and |features| is roughly |nodes|^2/2. So I can't directly join the result dataframe with the input dataset. Please correct me if I misunderstood your comments. Thanks!

}
}

test("power iteration clustering") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a test with a dataframe that has some extra data in it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add it.

@wangmiao1981
Copy link
Contributor Author

@thunterdb Thanks for your review! I will address the comments soon.

@thunterdb
Copy link
Contributor

@wangmiao1981 thanks a lot! I would be very happy to see that PR in Spark 2.2 and I will gladly help you for that.

@SparkQA
Copy link

SparkQA commented Feb 18, 2017

Test build #73079 has finished for PR 15770 at commit cb6c0bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@thunterdb
Copy link
Contributor

You are right, I had forgotten that for this algorithm, the input is the edges, and the output is the label for each of the vertices.

This is a tricky algorithm to put as a transformer, since it does not follow the usual convention that data should only be appended to the dataframe. I suggest we follow the same example as ALS the mllib implementation of PIC:

  • let's make it an estimator that returns a model: the model contains the labels for each of the points in a dataframe (the current output of transform)
  • the model's transform method now takes points with an id, and joins it with the models to append a column of labels. This is the same as ALS.

If we do not follow this pattern, then the model selection algorithms are not going to work. What do you think?

@wangmiao1981
Copy link
Contributor Author

@thunterdb Thanks for your response. In the original JIRA, we have discussed why we want it to be a transformer. Let me find it and post it here.

@wangmiao1981
Copy link
Contributor Author

Joseph K. Bradley added a comment - 31/Oct/16 18:14

Miao Wang Sorry for the slow response here. I do want us to add PIC to spark.ml, but we should discuss the design before the PR. Could you please close the PR for now but save the branch to re-open after discussion?

Let's have a design discussion first.

I agree that the big issue is that there isn't a clear way to make predictions on new data points. In fact, I've never heard of people trying to do so. Has anyone else?

Assuming that prediction is not meaningful for PIC, then I don't think the algorithm fits within the Pipeline framework, though it's debatable. I see a few options:

Put PIC in Pipelines as a Transformer, not an Estimator. We would just need to document that it is a very expensive Transformer.
Put PIC in spark.ml as a static method. We may have to do this anyways to support all of spark.mllib's Statistics.
Put PIC in GraphFrames (and push harder for GraphFrames to be merged back into Spark, which will include a much longer set of improvements).

My top choice is PIC as a Transformer. What do you think?

CC Yanbo Liang Seth Hendrickson Nick Pentreath opinions?
sethah Seth Hendrickson added a comment - 31/Oct/16 22:40

This seems like it fits the framework of a feature transformer. We could generate a real-valued feature column using PIC algorithm where the values are just the components of the pseudo-eigenvector. Alternatively we could pipeline a KMeans clustering on the end, but I think it makes more sense to let users do that themselves - but that's up for debate.

@wangmiao1981
Copy link
Contributor Author

Yanbo Liang added a comment - 02/Nov/16 09:30 - edited

I'm prefer to #1 and #3, but it looks like we can achieve both goals.
Graph can be represented by GraphX/GraphFrame or DataFrame/RDD. PIC model can be trained on both of them, but we use GraphX operators in the internal implementation which means input data should be converted to GraphX representation if it's RDD of tuples. So it's straight forward to make PIC as one of the algorithms in GraphX(or GraphFrame when it is merged back into Spark). However, users may load their graph as DataFrame/RDD and transform via ML Pipeline which should also be supported, so it's better we can wrap PIC of GraphX/GraphFrame as an Pipeline stage and then ML users can use it as well.
For some historical reasons(we don't want to add new features to GraphX), I propose to split this task into the following step:

Put PIC in Pipeline as a Transformer, use the GraphX operators in the implementation (This is consistent with Joseph K. Bradley's proposal).
Add PIC algorithms to GraphFrames when it is merged into Spark.
Make the ML PIC as a wrapper to call the GraphFrames PIC implementation.

I think this scenario should be better for different users(ML users and GraphFrames users), but still open to hear your thoughts. Thanks.

@wangmiao1981
Copy link
Contributor Author

I am checking ALS out to understand your suggestions. Thanks!

@SparkQA
Copy link

SparkQA commented Feb 21, 2017

Test build #73238 has finished for PR 15770 at commit f53765b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@thunterdb
Copy link
Contributor

@wangmiao1981 yes I had seen the discussions there. I believe that eventually PIC should be moved into graphframes, but we can have a simple API in spark.ml for the time being.

@wangmiao1981
Copy link
Contributor Author

@thunterdb Per discussion with Yanbo, there is one concern of making it an Estimator. For every transform, there is an additional data shuffle. cc @yanboliang @jkbradley Thanks!

@yanboliang
Copy link
Contributor

@thunterdb I have two concerns about making PIC as an Estimator:

  • As your suggestion, model's transform will introduce extra data shuffle compared with the original implementation.
  • It can not fit into the pipeline well, since both the estimator fit and model transform method should work on the dataset with same schema, like:
val model = new ALS().fit(dataset)
model.transform(dataset)

But in your suggestion, the input of PIC estimator is the edges or RDD[(Long, Long, Double)], the input of PIC model is the vertices ID.

I totally agree making PIC as a Transformer is tricky, but I did not find a better way. Thanks.

@wangmiao1981
Copy link
Contributor Author

ping @WeichenXu123

@WeichenXu123
Copy link
Contributor

@wangmiao1981 Sorry for delay, I will take a look later, thanks!

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I concern about the case of high centrality graph mentioned by @thunterdb .
And it seems we need a helper function, converting Graph structure into adjacency list format dataframe. so that user can use this transformer conveniently ?

val sparkSession = dataset.sparkSession
val rdd: RDD[(Long, Long, Double)] =
dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap {
case Row(id: Long, nbr: Vector, weight: Vector) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PIC require input graph matrix to be symmetric, and the weight should be non-negative. It is better to check them here. But checking symmetric seems cost too much, I have no good idea for now. cc @jkbradley Do you have some thoughts ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think checking symmetric is too much for PIC in this data format. Maybe, we can omit the check and put a comment and INFO on console to let users take care of it. @WeichenXu123

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I agree.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about not checking for symmetry as long as we document it.

But I do have one suggestion: Let's take neighbors and weights as Arrays, not Vectors. That may help prevent users from mistakenly passing in feature Vectors.

require(nbr.size == weight.size,
"The length of neighbor list must be equal to the the length of the weight list.")
val ids = Array.fill(nbr.size)(id)
for (i <- 0 until ids.size) yield (ids(i), nbr(i).toLong, weight(i))}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line code do not performs well, why you expand the var id into a array filled with the same value ?
You can use:

nbr.toArray.toIterator.zip(weight.toArray.toIterator).map(x => (id, x._1.toLong, x._2.toLong))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember last time you mentioned not using zip. I might get it wrong. By the way, it seems we need a helper function, do you want a helper function takes GraphFrame and returns the RDD used by PIC?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I don't like Array.zip because it generate a temp array But Array.toIterator.zip do not have this problem, it is iterator zip.

@wangmiao1981
Copy link
Contributor Author

I will address the review comments soon. Thanks! @WeichenXu123

@SparkQA
Copy link

SparkQA commented Oct 25, 2017

Test build #83064 has finished for PR 15770 at commit 752b685.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83230 has finished for PR 15770 at commit cfa18af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangmiao1981
Copy link
Contributor Author

@WeichenXu123 , for the graph helper, the Mllib has a version takes Graph[Double, Double] as a parameter for training. In ML, do we have to provide DataSet of Graph? Can you specify the requirement? I have addressed your other comments. Thanks!

@WeichenXu123
Copy link
Contributor

@wangmiao1981 oh, not a big deal, what I thought is that, user is possible to use graphx package to get the Graph[Double, Double], and in ml package it cannot accept this format, require user to convert it into dataframe (because mllib API is deprecate maybe user do not want to use it).
But, this can be discussed in separated JIRA.

@wangmiao1981
Copy link
Contributor Author

@WeichenXu123 Thanks for your review and reply! I agree with you that the helper can be discussed later for potential enhancement.

@wangmiao1981
Copy link
Contributor Author

@WeichenXu123 Any other comments? Thanks!

@WeichenXu123
Copy link
Contributor

LGTM. ping @yanboliang

@wangmiao1981
Copy link
Contributor Author

ping @yanboliang

1 similar comment
@wangmiao1981
Copy link
Contributor Author

ping @yanboliang

@jkbradley
Copy link
Member

Just pinged @yanboliang on JIRA about me taking over shepherding this. It will need at least one update: change Since versions from 2.3.0 to 2.4.0. Sorry for the long wait @wangmiao1981 : (

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just reviewed this. Overall, it looks good. In addition to some comments here, I had some comments about the text in docstrings. For that text, I figured it'd be easiest to send a PR to your PR here: wangmiao1981#4

It'd be nice to add a test for invalid input:

  • a negative weight
  • mismatched lengths for neighbors & weights in one row
  • (not symmetry since we don't check that)

Thanks @wangmiao1981 !

val sparkSession = dataset.sparkSession
val rdd: RDD[(Long, Long, Double)] =
dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap {
case Row(id: Long, nbr: Vector, weight: Vector) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about not checking for symmetry as long as we document it.

But I do have one suggestion: Let's take neighbors and weights as Arrays, not Vectors. That may help prevent users from mistakenly passing in feature Vectors.

@Since("2.3.0") override val uid: String)
extends Transformer with PowerIterationClusteringParams with DefaultParamsWritable {

setDefault(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It'd be nice to put these defaults right next the Param definitions in PowerIterationClusteringParams so that the default specified in the docstring is close to the default specified by setDefault (to make sure they stay in sync).

* Common params for PowerIterationClustering
*/
private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter
with HasFeaturesCol with HasPredictionCol with HasWeightCol {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use weightCol, which is for instance weights, not for this kind of adjacency. Let's add a new Param here, perhaps called neighborWeightCol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, featuresCol is not used, so it should be removed.

* Validates the input schema
* @param schema input schema
*/
protected def validateSchema(schema: StructType): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Also:

  • This should check other input columns to make sure they are defined.
  • This should add predictionCol, not check that it exists in the input.

def getNeighborCol: String = $(neighborCol)

/**
* Validates the input schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: No need for doc like this which is explained by the method title

override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra)

@Since("2.3.0")
def this() = this(Identifiable.randomUID("PowerIterationClustering"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Put constructors first, before other methods (like copy), to match the style of the rest of MLlib.

case assignment: Assignment => Row(assignment.id, assignment.cluster)
}

val schema = transformSchema(new StructType(Array(StructField($(idCol), LongType),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not need to explicitly create a schema here.

with MLlibTestSparkContext with DefaultReadWriteTest {

@transient var data: Dataset[_] = _
@transient var malData: Dataset[_] = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used

}
assert(predictions2.toSet == Set((1 until n1).toSet, (n1 until n).toSet))

val expectedColumns = Array("id", "prediction")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check this since it's already checks above by result2.select(...)

@jkbradley
Copy link
Member

@wangmiao1981 Do let me know if you're too busy now to resume this; I know it's been a long time. Thanks!

@WeichenXu123
Copy link
Contributor

@wangmiao1981 If you're busy I can help take over this. -:)

@jkbradley
Copy link
Member

I don't mind; I'll take it. But I'll mark @wangmiao1981 as the main contributor for the PR. Would you mind closing this issue @wangmiao1981 and I'll reopen a new PR under the same JIRA?

@wangmiao1981
Copy link
Contributor Author

@jkbradley Sorry for missing your comments. Anyway, I will close it now. I will choose another one to work on. Thanks!

@wangmiao1981
Copy link
Contributor Author

@jkbradley I close this one now. Thanks!

@jkbradley
Copy link
Member

OK sorry to push @wangmiao1981 ! I just want to make sure this gets in before I no longer have bandwidth for it. If you have the time, would you mind checking the updates I made in the new PR? Thanks!

ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 19, 2018
## What changes were proposed in this pull request?

This PR adds PowerIterationClustering as a Transformer to spark.ml.  In the transform method, it calls spark.mllib's PowerIterationClustering.run() method and transforms the return value assignments (the Kmeans output of the pseudo-eigenvector) as a DataFrame (id: LongType, cluster: IntegerType).

This PR is copied and modified from apache#15770  The primary author is wangmiao1981

## How was this patch tested?

This PR has 2 types of tests:
* Copies of tests from spark.mllib's PIC tests
* New tests specific to the spark.ml APIs

Author: [email protected] <[email protected]>
Author: wangmiao1981 <[email protected]>
Author: Joseph K. Bradley <[email protected]>

Closes apache#21090 from jkbradley/wangmiao1981-pic.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants