Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30102][ML][PYSPARK] GMM supports instance weighting #26735

Closed
wants to merge 8 commits into from

Conversation

zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

supports instance weighting in GMM

Why are the changes needed?

ML should support instance weighting

Does this PR introduce any user-facing change?

yes, a new param weightCol is exposed

How was this patch tested?

added testsuits

@@ -267,6 +268,18 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
assert(trueLikelihood ~== floatLikelihood absTol 1e-6)
}

test("GMM support instance weighting") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final model is highly sensitive to the initlization, which is computed in initRandom and highly related to the data distribution & partition.

@SparkQA
Copy link

SparkQA commented Dec 2, 2019

Test build #114725 has finished for PR 26735 at commit 5e898e2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't suppose there's any clean way to avoid the added computation for weights when the weights are all 1? looks hard. I'm OK with it

@zhengruifeng zhengruifeng changed the title [SPARK-30102][ML][PYSPARK] GMM supports instance weighting [SPARK-30102][WIP][ML][PYSPARK] GMM supports instance weighting Dec 5, 2019
@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Dec 9, 2019

It really took me some days to look into the test failure.

1, In 2.4.4, I can not reproduce the doctests locally:

    >>> summary.clusterSizes
    [2, 2, 2]
    >>> summary.logLikelihood
    8.14636...

until I explictly set numPartition=2, like this df = spark.createDataFrame(sc.parallelize(data, 2), ["features"]).
That is because on my laptop existing df = spark.createDataFrame(data, ["features"]) will create a df with 12 partitions, and GMM is highly sensitive to the intialization.
It is also weird to me that spark.createDataFrame will create a df with 6 partitions in the scala side.
My latop has a 8850 cpu with 6cores and 12threads.

2, After using df = spark.createDataFrame(sc.parallelize(data, 2), ["features"]), I can reproduce the results in 2.4.4. However, the doctests in this PR still fail, I log the optimization metric logLikelihood after each iteration and find that it seems a sudden numeric change.

Iteration 0 1 2 3 4 5 6 7 8 9
Master -13.306466494963615 -0.4307654468425961 0.49157579336057605 2.234212048899172 6.125367537295512 11.27762326533469 35.232285502171976 10.028821186214191 23.693392686726106 8.146360246481793
This PR -13.306466494963615 -0.430765446842597 0.4915757933605755 2.234212048899182 6.125367537295558 11.277623265335476 35.229680601767065 46.33491773124833 57.694248782061024 26.193922336279954

The metrics are near before iter-7, but some sudden numeric change happened in iter-7. But I think it is acceptable since the internal computation is complex.
Moreover, current convergence check math.abs(logLikelihood - logLikelihoodPrev) > $(tol) do not work when optimization objective meet a big hit. Like logLikelihood drop from 35.232285502171976 to 10.028821186214191 in iter-7.

So I think I need to:
1, change the df generation logic with explictly set numpartition; (current createDataFrame do not support this input, I need to create a rdd first)
2, change the result in the doctest (I tend to set MaxIter=5 and result=6.125)
3, change the convergence check and avoid big drop in optimzation metric(maybe in another PR and check other algs in it)

@srowen @huaxingao How do you think about it?

@zhengruifeng
Copy link
Contributor Author

I don't suppose there's any clean way to avoid the added computation for weights when the weights are all 1? looks hard. I'm OK with it

An approach is to have two method:
1, def add(instance: Vector, weight: Double): this.type = {
if(weight==1) add(instance) else ...
}
2, def add(instance: Vector): this.type

the second method do not include any extra multiplication.

@SparkQA
Copy link

SparkQA commented Dec 9, 2019

Test build #115008 has finished for PR 26735 at commit a068214.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@huaxingao
Copy link
Contributor

huaxingao commented Dec 9, 2019

I guess instead of changing maxIter=5 and compare the logLikelihood at iteration 5, maybe use a much bigger maxIter so it will converge. Compare the logLikelihood at convergence.

It puzzled me why the logLikelihoods from iteration 7 are so different from the logLikelihoods computed using the original code. Weight is not set in the python doctest and it uses default 1.0. So in theory, this should behave exact the same as the original code, the logLikelihood at each iteration should be very similar as the logLikelihood computed using the original code, right?

I tried both the original code and the code with changes, they start to have different logLikelihood at iteration 7, but both of them converge at iteration 25, with the same logLikelihood 65.02945125241477.

I agree that we probably need to change the current convergence check. Seems to me that we also need to compare the logLikelihood difference to the previous difference. The difference should be smaller and smaller and eventually converge. However, I tested with the current code, the logLikelihood differences are not getting smaller consistently.

iteration logLikelihoodPrev logLikelihood diff
15 36.402816949681664 36.55682231506764 0.1540053653859772
16 36.55682231506764 36.75888971475007 0.20206739968242715
17 36.75888971475007 37.581643170088086 0.8227534553380167
18 37.581643170088086 6.674670202869423 30.906972967218664
19 6.674670202869423 10.601046748584544 3.9263765457151205
20 10.601046748584544 39.71941181091317 29.11836506232863
21 39.71941181091317 49.2147989416624 9.49538713074923
22 49.2147989416624 76.11383657713708 26.899037635474677
23 76.11383657713708 71.28238165058754 4.83145492654954
24 71.28238165058754 65.02945125241477 6.252930398172765
25 65.02945125241477 65.02945125241477 0.0

@zhengruifeng
Copy link
Contributor Author

@huaxingao Thanks a lot for looking into this!
Yes, it is reasonable to set MaxIter=25 for a stable result, and I still need to look for the reason for the divergence at iter-7.
And in theory the loglikehood should be increasing, we should check the training procedure.

@srowen
Copy link
Member

srowen commented Dec 21, 2019

@zhengruifeng does this need more investigation?

@zhengruifeng
Copy link
Contributor Author

retest this please

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Dec 23, 2019

@huaxingao I found that the difference comes from the method to compute var sumWeights.
In master, it keeps the weights of each clusters, and get the sum by sums.weights.sum,
while in previous commit, I use a seperate var to keep the sum. The difference is quite tiny, but it unfortunately cause a sudden divergence at iter-7 (although the two impl finally convergen to the same result).
I revert the computation of sumWeights and now the doctest works fine.

@srowen I think this PR should be OK to merge. However, as disscussed above, we can see that the convergence of GMM is not stable, the loglikelihood may even drop sharply during the training procedure (which should not happen in theory). I think we need to make GMM more numerical stable in the future, but as to supporting instance weighting, I think current PR is OK.

@zhengruifeng zhengruifeng changed the title [SPARK-30102][WIP][ML][PYSPARK] GMM supports instance weighting [SPARK-30102][ML][PYSPARK] GMM supports instance weighting Dec 23, 2019
@@ -41,6 +41,24 @@ private[spark] object MetadataUtils {
}
}

/**
* Examine a schema to identify the number of features in a vector column.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same method is also added in PR for RobustScaler, I tend to merge that first, and rebase this PR.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking OK to me

slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) ^:^ 2.0)
var j = 0
while (j < numSamples) {
ss += ((sampleSlice(j).asBreeze - mean.asBreeze) ^:^ 2.0) * weightSlice(j)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a similar bit of code I had to change recently to compile in Scala 2.13. You don't have to address it here, but, I think that using val v: BV[Double] = sampleSlice(j).asBreeze - mean.asBreeze first fixes it

val weightSum = sampleWeights.sum

val gaussians = Array.tabulate(numClusters) { i =>
val sampleSlice = samples.view(i * numSamples, (i + 1) * numSamples)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you could factor out i * numSamples here and use it 4 times. Might be very slightly tidier

@SparkQA
Copy link

SparkQA commented Dec 23, 2019

Test build #115659 has finished for PR 26735 at commit 50e15fe.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115668 has finished for PR 26735 at commit 26f6564.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115725 has finished for PR 26735 at commit 26f6564.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 25, 2019

Test build #115754 has finished for PR 26735 at commit 53efa70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

Merged to master, thanks @srowen @huaxingao for reviewing!

@zhengruifeng zhengruifeng deleted the gmm_support_weight branch December 27, 2019 05:34
fqaiser94 pushed a commit to fqaiser94/spark that referenced this pull request Mar 30, 2020
### What changes were proposed in this pull request?
supports instance weighting in GMM

### Why are the changes needed?
ML should support instance weighting

### Does this PR introduce any user-facing change?
yes, a new param `weightCol` is exposed

### How was this patch tested?
added testsuits

Closes apache#26735 from zhengruifeng/gmm_support_weight.

Authored-by: zhengruifeng <[email protected]>
Signed-off-by: zhengruifeng <[email protected]>
dongjoon-hyun added a commit that referenced this pull request Nov 14, 2021
### What changes were proposed in this pull request?

This PR aims to reduce the test weight `100` to `90` to improve the robustness of test case `GMM support instance weighting`.
```scala
test("GMM support instance weighting") {
  val gm1 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed)
  val gm2 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed).setWeightCol("weight")
- Seq(1.0, 10.0, 100.0).foreach { w =>
+ Seq(1.0, 10.0, 90.0).foreach { w =>
```

### Why are the changes needed?

As mentioned in #26735 (comment), the weights of model changes when the weights grow. And, the final weight `100` seems to be high enough to cause failures on some JVMs. This is observed in Java 17 M1 native mode.

```
$ java -version
openjdk version "17" 2021-09-14 LTS
OpenJDK Runtime Environment Zulu17.28+13-CA (build 17+35-LTS)
OpenJDK 64-Bit Server VM Zulu17.28+13-CA (build 17+35-LTS, mixed mode, sharing)
```

**BEFORE**
```
$ build/sbt "mllib/test"
...
[info] - GMM support instance weighting *** FAILED *** (1 second, 722 milliseconds)
[info]   Expected 0.10476714410584752 and 1.209081654091291E-14 to be within 0.001 using absolute tolerance. (TestingUtils.scala:88)
...
[info] *** 1 TEST FAILED ***
[error] Failed: Total 1760, Failed 1, Errors 0, Passed 1759, Ignored 7
[error] Failed tests:
[error] 	org.apache.spark.ml.clustering.GaussianMixtureSuite
[error] (mllib / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 625 s (10:25), completed Nov 13, 2021, 6:21:13 PM
```

**AFTER**
```
[info] Total number of tests run: 1638
[info] Suites: completed 205, aborted 0
[info] Tests: succeeded 1638, failed 0, canceled 0, ignored 7, pending 0
[info] All tests passed.
[info] Passed: Total 1760, Failed 0, Errors 0, Passed 1760, Ignored 7
[success] Total time: 568 s (09:28), completed Nov 13, 2021, 6:09:16 PM
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #34584 from dongjoon-hyun/SPARK-37317.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit that referenced this pull request Nov 14, 2021
### What changes were proposed in this pull request?

This PR aims to reduce the test weight `100` to `90` to improve the robustness of test case `GMM support instance weighting`.
```scala
test("GMM support instance weighting") {
  val gm1 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed)
  val gm2 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed).setWeightCol("weight")
- Seq(1.0, 10.0, 100.0).foreach { w =>
+ Seq(1.0, 10.0, 90.0).foreach { w =>
```

### Why are the changes needed?

As mentioned in #26735 (comment), the weights of model changes when the weights grow. And, the final weight `100` seems to be high enough to cause failures on some JVMs. This is observed in Java 17 M1 native mode.

```
$ java -version
openjdk version "17" 2021-09-14 LTS
OpenJDK Runtime Environment Zulu17.28+13-CA (build 17+35-LTS)
OpenJDK 64-Bit Server VM Zulu17.28+13-CA (build 17+35-LTS, mixed mode, sharing)
```

**BEFORE**
```
$ build/sbt "mllib/test"
...
[info] - GMM support instance weighting *** FAILED *** (1 second, 722 milliseconds)
[info]   Expected 0.10476714410584752 and 1.209081654091291E-14 to be within 0.001 using absolute tolerance. (TestingUtils.scala:88)
...
[info] *** 1 TEST FAILED ***
[error] Failed: Total 1760, Failed 1, Errors 0, Passed 1759, Ignored 7
[error] Failed tests:
[error] 	org.apache.spark.ml.clustering.GaussianMixtureSuite
[error] (mllib / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 625 s (10:25), completed Nov 13, 2021, 6:21:13 PM
```

**AFTER**
```
[info] Total number of tests run: 1638
[info] Suites: completed 205, aborted 0
[info] Tests: succeeded 1638, failed 0, canceled 0, ignored 7, pending 0
[info] All tests passed.
[info] Passed: Total 1760, Failed 0, Errors 0, Passed 1760, Ignored 7
[success] Total time: 568 s (09:28), completed Nov 13, 2021, 6:09:16 PM
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #34584 from dongjoon-hyun/SPARK-37317.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a1f2ae0)
Signed-off-by: Dongjoon Hyun <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Dec 8, 2021
### What changes were proposed in this pull request?

This PR aims to reduce the test weight `100` to `90` to improve the robustness of test case `GMM support instance weighting`.
```scala
test("GMM support instance weighting") {
  val gm1 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed)
  val gm2 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed).setWeightCol("weight")
- Seq(1.0, 10.0, 100.0).foreach { w =>
+ Seq(1.0, 10.0, 90.0).foreach { w =>
```

### Why are the changes needed?

As mentioned in apache#26735 (comment), the weights of model changes when the weights grow. And, the final weight `100` seems to be high enough to cause failures on some JVMs. This is observed in Java 17 M1 native mode.

```
$ java -version
openjdk version "17" 2021-09-14 LTS
OpenJDK Runtime Environment Zulu17.28+13-CA (build 17+35-LTS)
OpenJDK 64-Bit Server VM Zulu17.28+13-CA (build 17+35-LTS, mixed mode, sharing)
```

**BEFORE**
```
$ build/sbt "mllib/test"
...
[info] - GMM support instance weighting *** FAILED *** (1 second, 722 milliseconds)
[info]   Expected 0.10476714410584752 and 1.209081654091291E-14 to be within 0.001 using absolute tolerance. (TestingUtils.scala:88)
...
[info] *** 1 TEST FAILED ***
[error] Failed: Total 1760, Failed 1, Errors 0, Passed 1759, Ignored 7
[error] Failed tests:
[error] 	org.apache.spark.ml.clustering.GaussianMixtureSuite
[error] (mllib / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 625 s (10:25), completed Nov 13, 2021, 6:21:13 PM
```

**AFTER**
```
[info] Total number of tests run: 1638
[info] Suites: completed 205, aborted 0
[info] Tests: succeeded 1638, failed 0, canceled 0, ignored 7, pending 0
[info] All tests passed.
[info] Passed: Total 1760, Failed 0, Errors 0, Passed 1760, Ignored 7
[success] Total time: 568 s (09:28), completed Nov 13, 2021, 6:09:16 PM
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes apache#34584 from dongjoon-hyun/SPARK-37317.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a1f2ae0)
Signed-off-by: Dongjoon Hyun <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
### What changes were proposed in this pull request?

This PR aims to reduce the test weight `100` to `90` to improve the robustness of test case `GMM support instance weighting`.
```scala
test("GMM support instance weighting") {
  val gm1 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed)
  val gm2 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed).setWeightCol("weight")
- Seq(1.0, 10.0, 100.0).foreach { w =>
+ Seq(1.0, 10.0, 90.0).foreach { w =>
```

### Why are the changes needed?

As mentioned in apache#26735 (comment), the weights of model changes when the weights grow. And, the final weight `100` seems to be high enough to cause failures on some JVMs. This is observed in Java 17 M1 native mode.

```
$ java -version
openjdk version "17" 2021-09-14 LTS
OpenJDK Runtime Environment Zulu17.28+13-CA (build 17+35-LTS)
OpenJDK 64-Bit Server VM Zulu17.28+13-CA (build 17+35-LTS, mixed mode, sharing)
```

**BEFORE**
```
$ build/sbt "mllib/test"
...
[info] - GMM support instance weighting *** FAILED *** (1 second, 722 milliseconds)
[info]   Expected 0.10476714410584752 and 1.209081654091291E-14 to be within 0.001 using absolute tolerance. (TestingUtils.scala:88)
...
[info] *** 1 TEST FAILED ***
[error] Failed: Total 1760, Failed 1, Errors 0, Passed 1759, Ignored 7
[error] Failed tests:
[error] 	org.apache.spark.ml.clustering.GaussianMixtureSuite
[error] (mllib / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 625 s (10:25), completed Nov 13, 2021, 6:21:13 PM
```

**AFTER**
```
[info] Total number of tests run: 1638
[info] Suites: completed 205, aborted 0
[info] Tests: succeeded 1638, failed 0, canceled 0, ignored 7, pending 0
[info] All tests passed.
[info] Passed: Total 1760, Failed 0, Errors 0, Passed 1760, Ignored 7
[success] Total time: 568 s (09:28), completed Nov 13, 2021, 6:09:16 PM
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes apache#34584 from dongjoon-hyun/SPARK-37317.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a1f2ae0)
Signed-off-by: Dongjoon Hyun <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
### What changes were proposed in this pull request?

This PR aims to reduce the test weight `100` to `90` to improve the robustness of test case `GMM support instance weighting`.
```scala
test("GMM support instance weighting") {
  val gm1 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed)
  val gm2 = new GaussianMixture().setK(k).setMaxIter(20).setSeed(seed).setWeightCol("weight")
- Seq(1.0, 10.0, 100.0).foreach { w =>
+ Seq(1.0, 10.0, 90.0).foreach { w =>
```

### Why are the changes needed?

As mentioned in apache#26735 (comment), the weights of model changes when the weights grow. And, the final weight `100` seems to be high enough to cause failures on some JVMs. This is observed in Java 17 M1 native mode.

```
$ java -version
openjdk version "17" 2021-09-14 LTS
OpenJDK Runtime Environment Zulu17.28+13-CA (build 17+35-LTS)
OpenJDK 64-Bit Server VM Zulu17.28+13-CA (build 17+35-LTS, mixed mode, sharing)
```

**BEFORE**
```
$ build/sbt "mllib/test"
...
[info] - GMM support instance weighting *** FAILED *** (1 second, 722 milliseconds)
[info]   Expected 0.10476714410584752 and 1.209081654091291E-14 to be within 0.001 using absolute tolerance. (TestingUtils.scala:88)
...
[info] *** 1 TEST FAILED ***
[error] Failed: Total 1760, Failed 1, Errors 0, Passed 1759, Ignored 7
[error] Failed tests:
[error] 	org.apache.spark.ml.clustering.GaussianMixtureSuite
[error] (mllib / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 625 s (10:25), completed Nov 13, 2021, 6:21:13 PM
```

**AFTER**
```
[info] Total number of tests run: 1638
[info] Suites: completed 205, aborted 0
[info] Tests: succeeded 1638, failed 0, canceled 0, ignored 7, pending 0
[info] All tests passed.
[info] Passed: Total 1760, Failed 0, Errors 0, Passed 1760, Ignored 7
[success] Total time: 568 s (09:28), completed Nov 13, 2021, 6:09:16 PM
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes apache#34584 from dongjoon-hyun/SPARK-37317.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a1f2ae0)
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants