-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29967][ML][PYTHON] KMeans support instance weighting #26739
Conversation
if (iteration == 0) { | ||
instr.foreach(_.logNumExamples(collected.values.map(_._2).sum)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have counts
any more. Is it OK to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just log the sum of weights? it keeps the same info in the unweighted case and it's still sort of meaningful as 'number of examples' in the weighted case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1, I guess we need to add a new var count: Long
to get the total count of dataset, since in other algs like LinearSVC
,LogisticRegression
, instr.logNumExamples
logs the unweighted count;
2, Since more and more algs support weightCol, I think we may added a new method like instr.logSumOfWeights
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess maybe leave the code this way for now and open a separate PR later on to add method instr.logSumOfWeights
and use it in all the algs that support weight?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am OK to add new instr.log
in other PR.
Here I prefer to keep instr.logNumExamples
log the unweighted count, in order to keep it in sync with other algs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am OK to add new instr.log
in other PR.
Here I prefer to keep instr.logNumExamples
log the unweighted count, in order to keep it in sync with other algs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks!
I also added logSumOfWeights
. I will update other algs that has weightCol once this PR is merged.
Test build #114735 has finished for PR 26739 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks resaonable.
} | ||
|
||
private[spark] def run( | ||
data: RDD[Vector], | ||
private[spark] def runWithweight( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: runWithWeight
if (iteration == 0) { | ||
instr.foreach(_.logNumExamples(collected.values.map(_._2).sum)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just log the sum of weights? it keeps the same info in the unweighted case and it's still sort of meaningful as 'number of examples' in the weighted case
val clusterWeightSum = Array.fill(thisCenters.length)(0.0) | ||
|
||
pointsAndWeights.foreach { case (point, weight) => | ||
var (bestCenter, cost) = distanceMeasureInstance.findClosest(thisCenters, point) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Total nit, but you can use val and then pass cost * weight
to costAccum.add
Test build #114794 has finished for PR 26739 at commit
|
} | ||
|
||
val instances: RDD[(OldVector, Double)] = dataset.select( | ||
DatasetUtils.columnToVector(dataset, getFeaturesCol), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, why breaking this line?
dataset
.select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w)
.rdd.map { ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will update the format.
if (iteration == 0) { | ||
instr.foreach(_.logNumExamples(collected.values.map(_._2).sum)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1, I guess we need to add a new var count: Long
to get the total count of dataset, since in other algs like LinearSVC
,LogisticRegression
, instr.logNumExamples
logs the unweighted count;
2, Since more and more algs support weightCol, I think we may added a new method like instr.logSumOfWeights
@@ -100,6 +100,18 @@ private[spark] abstract class DistanceMeasure extends Serializable { | |||
new VectorWithNorm(sum) | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the above def centroid(sum: Vector, count: Long): VectorWithNorm
still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is still used by BisecttingKMeans
// clusterWeightSum is needed to calculate cluster center | ||
// cluster center = | ||
// sample1 * weight1/clusterWeightSum + sample2 * weight2/clusterWeightSum + ... | ||
val clusterWeightSum = Array.fill(thisCenters.length)(0.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, Array.ofDim[Double](thisCenters.length)
or new Array[Double](thisCenters.length)
Test build #114831 has finished for PR 26739 at commit
|
Test build #114869 has finished for PR 26739 at commit
|
}.collectAsMap() | ||
|
||
if (iteration == 0) { | ||
instr.foreach(_.logNumExamples(collected.values.map(_._2).sum)) | ||
instr.foreach(_.logNumExamples(data.count())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, what about using a sc.longAccumulator
to accumulate the count? like costAccum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks!
Test build #115046 has finished for PR 26739 at commit
|
Merged to master |
Thanks! @srowen @zhengruifeng |
What changes were proposed in this pull request?
add weight support in KMeans
Why are the changes needed?
KMeans should support weighting
Does this PR introduce any user-facing change?
Yes.
KMeans.setWeightCol
How was this patch tested?
Unit Tests