Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capacity-limiter-api: observer all gradient limit changes #3107

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion servicetalk-capacity-limiter-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version")
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))
testImplementation project(":servicetalk-test-resources")
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.mockito:mockito-core:$mockitoCoreVersion"
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co
/**
* Needs to be called while holding the lock.
*/
private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) {
private double updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) {
assert lock.isHeldByCurrentThread();
if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) {
return -1;
Expand All @@ -169,47 +169,57 @@ private int updateLimit(final long timestampNs, final double shortLatencyMillis,
}

final double headroom = gradient >= 1 ? this.headroom.apply(gradient, limit) : 0;
final double oldLimit = limit;
final int newLimit = (int) (limit = min(max, max(min, (gradient * limit) + headroom)));
observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit);
return newLimit;
limit = min(max, max(min, (gradient * limit) + headroom));
return gradient;
}

private int onSuccess(final long durationNs) {
final long nowNs = timeSource.getAsLong();
final long rttMillis = NANOSECONDS.toMillis(durationNs);
int newPending;
int limit;
final double longLatencyMillis;
final double shortLatencyMillis;
final int newPending;
final double oldLimit;
final double newLimit;
double gradient = 0.0;
lock.lock();
try {
limit = (int) this.limit;
final double longLatencyMillis = longLatency.observe(nowNs, rttMillis);
final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis);
oldLimit = this.limit;
longLatencyMillis = longLatency.observe(nowNs, rttMillis);
shortLatencyMillis = shortLatency.observe(nowNs, rttMillis);

newPending = --pending;
if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) {
limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis);
gradient = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis);
}
newLimit = limit;
} finally {
lock.unlock();
}

if (oldLimit != newLimit) {
observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit);
}
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
observer.onActiveRequestsDecr();
return limit - newPending;
return (int) (newLimit - newPending);
}

private int onDrop() {
int newPending;
double newLimit;

final double oldLimit;
final double newLimit;
lock.lock();
try {
oldLimit = limit;
newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss));
newPending = --pending;
} finally {
lock.unlock();
}

if (oldLimit != newLimit) {
// latencies and gradient were not involved in the calculations
observer.onLimitChange(0.0 /*longLatencyMillis*/, 0.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit);
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
}
observer.onActiveRequestsDecr();
return (int) (newLimit - newPending);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,13 @@ public interface Observer {
* <p>
* The rate of reporting to the observer is based on the rate of change to this
* {@link CapacityLimiter} and the {@link #limitUpdateInterval(Duration) sampling interval}.
* @param longRtt The exponential moving average stat of request response times.
* @param shortRtt The sampled response time that triggered the limit change.
* @param longRtt The exponential moving average stat of request response times. A negative value means
* response times were not used in the calculation.
* @param shortRtt The sampled response time that triggered the limit change. A negative value means
* * response times were not used in the calculation.
* @param gradient The response time gradient (delta) between the long exposed stat (see. longRtt)
* and the sampled response time (see. shortRtt).
* and the sampled response time (see. shortRtt). A negative value means the gradient was not used in the
* calculation.
* @param oldLimit The previous limit of the limiter.
* @param newLimit The current limit of the limiter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.capacity.limiter.api;

import io.servicetalk.capacity.limiter.api.GradientCapacityLimiterBuilder.Observer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -27,13 +29,20 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.AdditionalMatchers.not;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

class GradientCapacityLimiterTest {

private static final Exception SAD_EXCEPTION = new Exception("sad");

private static final Classification DEFAULT = () -> 0;

private final Observer observer = mock(Observer.class);
@Nullable
private CapacityLimiter capacityLimiter;
@Nullable
Expand All @@ -47,25 +56,34 @@ void setup() {
}
capacityLimiter = new GradientCapacityLimiterBuilder()
.timeSource(timeSource)
.limitUpdateInterval(Duration.ofMillis(50))
.observer(observer)
.build();
}

@Test
void canAcquireTicket() {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
assertThat(ticket, notNullValue());
verify(observer).onActiveRequestsIncr();

// release it and make sure we observed the release.
ticket.completed();
verify(observer).onActiveRequestsDecr();
verifyNoMoreInteractions(observer);
}

@Test
void capacityCanDepleteToTheMinLimit() {
for (;;) {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(10).toNanos();
currentTime += Duration.ofMillis(50).toNanos();
int capacity = ticket.failed(SAD_EXCEPTION);
if (capacity == DEFAULT_MIN_LIMIT) {
break;
}
}

}

@Test
Expand All @@ -89,4 +107,20 @@ void canRejectTicketAcquisitions() {
lastTicket = capacityLimiter.tryAcquire(DEFAULT, null);
assertThat(lastTicket, notNullValue());
}

@Test
void observesLimitChanges() {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(10).toNanos();
ticket.failed(SAD_EXCEPTION);

// ticket failure will not use gradient
verify(observer).onLimitChange(eq(0.0), eq(0.0), eq(-1.0), eq(100.0), eq(50.0));

ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(50).toNanos();
// Ticket success should adjust observation upward.
ticket.completed();
verify(observer).onLimitChange(not(eq(0.0)), not(eq(0.0)), not(eq(-1.0)), anyDouble(), anyDouble());
}
}
Loading