From c12779bf081072a89a6711090a49924ac6b1ed8e Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 14 Nov 2024 16:25:03 -0700 Subject: [PATCH 1/3] capacity-limiter-api: Observer all gradient limit changes Motivation: We currently see callbacks to Observer.onLimitChange(..) only for the successful case, but it's perhaps even more important for the limit decrease case. Modifications: - Call the callbacks whenever the limit changes. - Also call them on the failure case. Since we don't use the gradient and RTT params, we pass in -1.0. This also requires a change in the javadoc. - Add some tests to make sure the observers are called. --- servicetalk-capacity-limiter-api/build.gradle | 3 +- .../limiter/api/GradientCapacityLimiter.java | 40 ++++++++++++------- .../api/GradientCapacityLimiterBuilder.java | 9 +++-- .../api/GradientCapacityLimiterTest.java | 36 ++++++++++++++++- 4 files changed, 68 insertions(+), 20 deletions(-) diff --git a/servicetalk-capacity-limiter-api/build.gradle b/servicetalk-capacity-limiter-api/build.gradle index 3976e6ea1c..141a14eb4c 100644 --- a/servicetalk-capacity-limiter-api/build.gradle +++ b/servicetalk-capacity-limiter-api/build.gradle @@ -26,6 +26,7 @@ dependencies { testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version") testImplementation testFixtures(project(":servicetalk-concurrent-internal")) testImplementation project(":servicetalk-test-resources") - testImplementation "org.junit.jupiter:junit-jupiter-api" testImplementation "org.hamcrest:hamcrest:$hamcrestVersion" + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.mockito:mockito-core:$mockitoCoreVersion" } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java index 84e18ff0c7..3c8d99adad 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java @@ -149,7 +149,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co /** * Needs to be called while holding the lock. */ - private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) { + private double updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) { assert lock.isHeldByCurrentThread(); if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) { return -1; @@ -169,47 +169,57 @@ private int updateLimit(final long timestampNs, final double shortLatencyMillis, } final double headroom = gradient >= 1 ? this.headroom.apply(gradient, limit) : 0; - final double oldLimit = limit; - final int newLimit = (int) (limit = min(max, max(min, (gradient * limit) + headroom))); - observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit); - return newLimit; + limit = min(max, max(min, (gradient * limit) + headroom)); + return gradient; } private int onSuccess(final long durationNs) { final long nowNs = timeSource.getAsLong(); final long rttMillis = NANOSECONDS.toMillis(durationNs); - int newPending; - int limit; + final double longLatencyMillis; + final double shortLatencyMillis; + final int newPending; + final double oldLimit; + final double newLimit; + double gradient = 0.0; lock.lock(); try { - limit = (int) this.limit; - final double longLatencyMillis = longLatency.observe(nowNs, rttMillis); - final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis); + oldLimit = this.limit; + longLatencyMillis = longLatency.observe(nowNs, rttMillis); + shortLatencyMillis = shortLatency.observe(nowNs, rttMillis); newPending = --pending; if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) { - limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis); + gradient = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis); } + newLimit = limit; } finally { lock.unlock(); } - + if (oldLimit != newLimit) { + observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit); + } observer.onActiveRequestsDecr(); - return limit - newPending; + return (int) (newLimit - newPending); } private int onDrop() { int newPending; - double newLimit; - + final double oldLimit; + final double newLimit; lock.lock(); try { + oldLimit = limit; newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss)); newPending = --pending; } finally { lock.unlock(); } + if (oldLimit != newLimit) { + // latencies and gradient were not involved in the calculations + observer.onLimitChange(0.0 /*longLatencyMillis*/, 0.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit); + } observer.onActiveRequestsDecr(); return (int) (newLimit - newPending); } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java index a278001454..850e9c229b 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java @@ -348,10 +348,13 @@ public interface Observer { *

* The rate of reporting to the observer is based on the rate of change to this * {@link CapacityLimiter} and the {@link #limitUpdateInterval(Duration) sampling interval}. - * @param longRtt The exponential moving average stat of request response times. - * @param shortRtt The sampled response time that triggered the limit change. + * @param longRtt The exponential moving average stat of request response times. A negative value means + * response times were not used in the calculation. + * @param shortRtt The sampled response time that triggered the limit change. A negative value means + * * response times were not used in the calculation. * @param gradient The response time gradient (delta) between the long exposed stat (see. longRtt) - * and the sampled response time (see. shortRtt). + * and the sampled response time (see. shortRtt). A negative value means the gradient was not used in the + * calculation. * @param oldLimit The previous limit of the limiter. * @param newLimit The current limit of the limiter. */ diff --git a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java index a117f31d02..3aacda9336 100644 --- a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java +++ b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java @@ -15,6 +15,8 @@ */ package io.servicetalk.capacity.limiter.api; +import io.servicetalk.capacity.limiter.api.GradientCapacityLimiterBuilder.Observer; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,6 +29,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.AdditionalMatchers.not; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; class GradientCapacityLimiterTest { @@ -34,6 +42,7 @@ class GradientCapacityLimiterTest { private static final Classification DEFAULT = () -> 0; + private final Observer observer = mock(Observer.class); @Nullable private CapacityLimiter capacityLimiter; @Nullable @@ -47,6 +56,8 @@ void setup() { } capacityLimiter = new GradientCapacityLimiterBuilder() .timeSource(timeSource) + .limitUpdateInterval(Duration.ofMillis(50)) + .observer(observer) .build(); } @@ -54,18 +65,25 @@ void setup() { void canAcquireTicket() { CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); assertThat(ticket, notNullValue()); + verify(observer).onActiveRequestsIncr(); + + // release it and make sure we observed the release. + ticket.completed(); + verify(observer).onActiveRequestsDecr(); + verifyNoMoreInteractions(observer); } @Test void capacityCanDepleteToTheMinLimit() { for (;;) { CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); - currentTime += Duration.ofMillis(10).toNanos(); + currentTime += Duration.ofMillis(50).toNanos(); int capacity = ticket.failed(SAD_EXCEPTION); if (capacity == DEFAULT_MIN_LIMIT) { break; } } + } @Test @@ -89,4 +107,20 @@ void canRejectTicketAcquisitions() { lastTicket = capacityLimiter.tryAcquire(DEFAULT, null); assertThat(lastTicket, notNullValue()); } + + @Test + void observesLimitChanges() { + CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); + currentTime += Duration.ofMillis(10).toNanos(); + ticket.failed(SAD_EXCEPTION); + + // ticket failure will not use gradient + verify(observer).onLimitChange(eq(0.0), eq(0.0), eq(-1.0), eq(100.0), eq(50.0)); + + ticket = capacityLimiter.tryAcquire(DEFAULT, null); + currentTime += Duration.ofMillis(50).toNanos(); + // Ticket success should adjust observation upward. + ticket.completed(); + verify(observer).onLimitChange(not(eq(0.0)), not(eq(0.0)), not(eq(-1.0)), anyDouble(), anyDouble()); + } } From 73b0afda1598289f1f65db929d8c14d2f3002764 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 14 Nov 2024 16:52:35 -0700 Subject: [PATCH 2/3] Cleanup --- .../capacity/limiter/api/GradientCapacityLimiterTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java index 3aacda9336..7fabff385f 100644 --- a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java +++ b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java @@ -77,13 +77,12 @@ void canAcquireTicket() { void capacityCanDepleteToTheMinLimit() { for (;;) { CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); - currentTime += Duration.ofMillis(50).toNanos(); + currentTime += Duration.ofMillis(10).toNanos(); int capacity = ticket.failed(SAD_EXCEPTION); if (capacity == DEFAULT_MIN_LIMIT) { break; } } - } @Test From 23b88cf4ea8815acc84a3a1b9b7cb5b15bb0b8b7 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 14 Nov 2024 20:13:56 -0700 Subject: [PATCH 3/3] touchups --- .../capacity/limiter/api/GradientCapacityLimiter.java | 3 ++- .../capacity/limiter/api/GradientCapacityLimiterBuilder.java | 3 +++ .../capacity/limiter/api/GradientCapacityLimiterTest.java | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java index 3c8d99adad..326451c393 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java @@ -218,7 +218,8 @@ private int onDrop() { if (oldLimit != newLimit) { // latencies and gradient were not involved in the calculations - observer.onLimitChange(0.0 /*longLatencyMillis*/, 0.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit); + observer.onLimitChange( + -1.0 /*longLatencyMillis*/, -1.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit); } observer.onActiveRequestsDecr(); return (int) (newLimit - newPending); diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java index 850e9c229b..9acc0d7ad3 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java @@ -329,6 +329,9 @@ private String name() { /** * A state observer for Gradient {@link CapacityLimiter} to monitor internal state changes. + * + * Note: callbacks are not guaranteed to be executed sequentially or in exactly the same order that state changes + * occurred. */ public interface Observer { diff --git a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java index 7fabff385f..644d33a9fd 100644 --- a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java +++ b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java @@ -114,12 +114,12 @@ void observesLimitChanges() { ticket.failed(SAD_EXCEPTION); // ticket failure will not use gradient - verify(observer).onLimitChange(eq(0.0), eq(0.0), eq(-1.0), eq(100.0), eq(50.0)); + verify(observer).onLimitChange(eq(-1.0), eq(-1.0), eq(-1.0), eq(100.0), eq(50.0)); ticket = capacityLimiter.tryAcquire(DEFAULT, null); currentTime += Duration.ofMillis(50).toNanos(); // Ticket success should adjust observation upward. ticket.completed(); - verify(observer).onLimitChange(not(eq(0.0)), not(eq(0.0)), not(eq(-1.0)), anyDouble(), anyDouble()); + verify(observer).onLimitChange(not(eq(-1.0)), not(eq(-1.0)), not(eq(-1.0)), anyDouble(), anyDouble()); } }