From f1e39d75aed4f786950b7dda5fdf15b468022752 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 8 May 2024 14:26:21 -0600 Subject: [PATCH] loadbalancer-experimental: add support for weights in round robin (#2909) Motivation: We want to support weighted but don't in the round-robin HostSelector. Modifications: Add weight support to round-robin using the static stride algorithm common to the grpc libraries. --- .../license/LICENSE.grpc-java.txt | 202 ++++++++++++++++++ .../loadbalancer/BaseHostSelector.java | 14 ++ .../servicetalk/loadbalancer/P2CSelector.java | 16 -- .../RoundRobinLoadBalancingPolicy.java | 28 ++- .../loadbalancer/RoundRobinSelector.java | 124 ++++++++++- .../loadbalancer/P2CSelectorTest.java | 40 ++-- .../loadbalancer/RoundRobinSelectorTest.java | 89 +++++++- .../loadbalancer/SelectorTestHelpers.java | 10 +- 8 files changed, 466 insertions(+), 57 deletions(-) create mode 100644 servicetalk-loadbalancer-experimental/license/LICENSE.grpc-java.txt diff --git a/servicetalk-loadbalancer-experimental/license/LICENSE.grpc-java.txt b/servicetalk-loadbalancer-experimental/license/LICENSE.grpc-java.txt new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/servicetalk-loadbalancer-experimental/license/LICENSE.grpc-java.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java index cd1b40734d..82e2bd371c 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java @@ -30,6 +30,8 @@ abstract class BaseHostSelector implements HostSelector { + private static final double ACCEPTABLE_PERCENT_ERROR = 0.01; + private final String targetResource; private final List> hosts; BaseHostSelector(final List> hosts, final String targetResource) { @@ -95,6 +97,18 @@ private Single noHostsFailure() { this.getClass(), "selectConnection(...)")); } + static boolean approxEqual(double a, double b) { + return Math.abs(a - b) < ACCEPTABLE_PERCENT_ERROR; + } + + static boolean isNormalized(double[] probabilities) { + double ptotal = 0; + for (double p : probabilities) { + ptotal += p; + } + return approxEqual(ptotal, 1); + } + private static boolean anyHealthy( final List> usedHosts) { for (Host host : usedHosts) { diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java index f2644933ee..24a31329ac 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java @@ -29,8 +29,6 @@ import java.util.function.Predicate; import javax.annotation.Nullable; -import static java.lang.Math.abs; - /** * This {@link LoadBalancer} selection algorithm is based on work by Michael David Mitzenmacher in The Power of Two * Choices in Randomized Load Balancing. @@ -42,8 +40,6 @@ final class P2CSelector extends BaseHostSelector { private static final Logger LOGGER = LoggerFactory.getLogger(P2CSelector.class); - - private static final double ACCEPTABLE_PERCENT_ERROR = 0.01; private static final EntrySelector EMPTY_SELECTOR = new EqualWeightEntrySelector(0); @Nullable @@ -319,16 +315,4 @@ private AliasTableEntrySelector buildAliasTable(double[] pin) { } return new AliasTableEntrySelector(pout, aliases); } - - private static boolean approxEqual(double a, double b) { - return abs(a - b) < ACCEPTABLE_PERCENT_ERROR; - } - - private static boolean isNormalized(double[] probabilities) { - double ptotal = 0; - for (double p : probabilities) { - ptotal += p; - } - return approxEqual(ptotal, 1); - } } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java index ef800cdbf2..428ea2739d 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancingPolicy.java @@ -35,15 +35,17 @@ public final class RoundRobinLoadBalancingPolicy { private final boolean failOpen; + private final boolean ignoreWeights; - private RoundRobinLoadBalancingPolicy(final boolean failOpen) { + private RoundRobinLoadBalancingPolicy(final boolean failOpen, final boolean ignoreWeights) { this.failOpen = failOpen; + this.ignoreWeights = ignoreWeights; } @Override HostSelector buildSelector(final List> hosts, final String targetResource) { - return new RoundRobinSelector<>(hosts, targetResource, failOpen); + return new RoundRobinSelector<>(hosts, targetResource, failOpen, ignoreWeights); } @Override @@ -61,18 +63,36 @@ public String toString() { */ public static final class Builder { + private static final boolean DEFAULT_IGNORE_WEIGHTS = false; + private boolean failOpen = DEFAULT_FAIL_OPEN_POLICY; + private boolean ignoreWeights = DEFAULT_IGNORE_WEIGHTS; /** * Set whether the host selector should attempt to use an unhealthy {@link Host} as a last resort. * @param failOpen whether the host selector should attempt to use an unhealthy {@link Host} as a last resort. * @return this {@link P2CLoadBalancingPolicy.Builder}. */ - public RoundRobinLoadBalancingPolicy.Builder failOpen(final boolean failOpen) { + public Builder failOpen(final boolean failOpen) { this.failOpen = failOpen; return this; } + /** + * Set whether the host selector should ignore {@link Host}s weight. + * Host weight influences the probability it will be selected to serve a request. The host weight can come + * from many sources including known host capacity, priority groups, and others, so ignoring weight + * information can lead to other features not working properly and should be used with care. + * Defaults to {@value DEFAULT_IGNORE_WEIGHTS}. + * + * @param ignoreWeights whether the host selector should ignore host weight information. + * @return {@code this} + */ + public Builder ignoreWeights(final boolean ignoreWeights) { + this.ignoreWeights = ignoreWeights; + return this; + } + /** * Construct the immutable {@link RoundRobinLoadBalancingPolicy}. * @param the type of the resolved address. @@ -81,7 +101,7 @@ public RoundRobinLoadBalancingPolicy.Builder failOpen(final boolean failOpen) { */ public RoundRobinLoadBalancingPolicy build() { - return new RoundRobinLoadBalancingPolicy<>(failOpen); + return new RoundRobinLoadBalancingPolicy<>(failOpen, ignoreWeights); } } } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java index 3940ccebb2..d89996ccaa 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java @@ -1,5 +1,5 @@ /* - * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * Copyright © 2023-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.servicetalk.loadbalancer; import io.servicetalk.client.api.LoadBalancedConnection; @@ -25,22 +40,33 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +/** + * A round-robin {@link HostSelector} based on the Static Stride Scheduler used in the grpc suite of libraries. + * @param the concrete type of the resolved address. + * @param the concrete type of the load balanced address. + */ final class RoundRobinSelector extends BaseHostSelector { + private static final int MAX_WEIGHT = 0xffff; + private final AtomicInteger index; + private final Scheduler scheduler; private final boolean failOpen; + private final boolean ignoreWeights; RoundRobinSelector(final List> hosts, final String targetResource, - final boolean failOpen) { - this(new AtomicInteger(), hosts, targetResource, failOpen); + final boolean failOpen, final boolean ignoreWeights) { + this(new AtomicInteger(), hosts, targetResource, failOpen, ignoreWeights); } private RoundRobinSelector(final AtomicInteger index, final List> hosts, - final String targetResource, final boolean failOpen) { + final String targetResource, final boolean failOpen, final boolean ignoreWeights) { super(hosts, targetResource); this.index = index; + this.scheduler = ignoreWeights ? new ConstantScheduler(index, hosts.size()) : buildScheduler(index, hosts()); this.failOpen = failOpen; + this.ignoreWeights = ignoreWeights; } @Override @@ -48,7 +74,7 @@ protected Single selectConnection0( final Predicate selector, @Nullable final ContextMap context, final boolean forceNewConnectionAndReserve) { // try one loop over hosts and if all are expired, give up - final int cursor = (index.getAndIncrement() & Integer.MAX_VALUE) % hosts().size(); + final int cursor = scheduler.nextHost(); Host failOpenHost = null; for (int i = 0; i < hosts().size(); ++i) { // for a particular iteration we maintain a local cursor without contention with other requests @@ -78,6 +104,92 @@ protected Single selectConnection0( @Override public HostSelector rebuildWithHosts(@Nonnull List> hosts) { - return new RoundRobinSelector<>(index, hosts, getTargetResource(), failOpen); + return new RoundRobinSelector<>(index, hosts, getTargetResource(), failOpen, ignoreWeights); + } + + private static Scheduler buildScheduler(AtomicInteger index, List> hosts) { + boolean allEqualWeights = true; + double maxWeight = 0; + for (Host host : hosts) { + double hostWeight = host.weight(); + maxWeight = Math.max(maxWeight, hostWeight); + allEqualWeights = allEqualWeights && approxEqual(hosts.get(0).weight(), hostWeight); + } + + if (allEqualWeights) { + return new ConstantScheduler(index, hosts.size()); + } else { + double scaleFactor = MAX_WEIGHT / maxWeight; + int[] scaledWeights = new int[hosts.size()]; + + for (int i = 0; i < scaledWeights.length; i++) { + // Using ceil ensures a few things: + // - our max weighted element is picked on every round + // - hosts with weights near zero will never be truly zero + // - true zero weights will still be truly zero + scaledWeights[i] = Math.min(MAX_WEIGHT, (int) Math.ceil(hosts.get(i).weight() * scaleFactor)); + } + return new StrideScheduler(index, scaledWeights); + } + } + + private abstract static class Scheduler { + abstract int nextHost(); + } + + private static final class ConstantScheduler extends Scheduler { + + private final AtomicInteger index; + private final int hostsSize; + + ConstantScheduler(AtomicInteger index, int hostsSize) { + this.index = index; + this.hostsSize = hostsSize; + } + + @Override + int nextHost() { + return (int) (Integer.toUnsignedLong(index.getAndIncrement()) % hostsSize); + } + } + + // The stride scheduler is heavily inspired by the Google gRPC implementations with some minor modifications. + // The stride scheduler algorithm is convenient in that it doesn't require synchronization like a priority queue + // based solution would which fits well with highly concurrent nature of our HostSelector abstraction. + // See the java-grpc library for more details: + // https://github.com/grpc/grpc-java/blob/da619e2b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java + private static final class StrideScheduler extends Scheduler { + + private final AtomicInteger index; + private final int[] weights; + + StrideScheduler(AtomicInteger index, int[] weights) { + this.index = index; + this.weights = weights; + } + + @Override + int nextHost() { + while (true) { + long counter = Integer.toUnsignedLong(index.getAndIncrement()); + long pass = counter / weights.length; + int i = (int) counter % weights.length; + // We add a unique offset for each offset which could be anything so long as it's constant throughout + // iteration. This is helpful in the case where weights are [1, .. 1, 5] since the scheduling could + // otherwise look something like this: + // .... + // [t, .., t, t] + // [f, .., f, t] <- we will see all elements false except the last for 5x iterations. + // [f, .., f, t] + // [f, .., f, t] + // [f, .., f, t] + // [t, .., t, t] + // .... + int offset = MAX_WEIGHT / 2 * i; + if ((weights[i] * pass + offset) % MAX_WEIGHT >= MAX_WEIGHT - weights[i]) { + return i; + } + } + } } } diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java index abb3067707..7c6291d06c 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java @@ -32,7 +32,7 @@ import javax.annotation.Nullable; import static io.servicetalk.loadbalancer.SelectorTestHelpers.PREDICATE; -import static io.servicetalk.loadbalancer.SelectorTestHelpers.connections; +import static io.servicetalk.loadbalancer.SelectorTestHelpers.generateHosts; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.either; @@ -71,14 +71,14 @@ void init(List> hosts) { @Test void equalWeightDistribution() throws Exception { - List> hosts = connections(5); + List> hosts = SelectorTestHelpers.generateHosts(5); init(hosts); checkProbabilities(hosts); } @Test void unequalWeightDistribution() throws Exception { - List> hosts = connections(5); + List> hosts = SelectorTestHelpers.generateHosts(5); when(hosts.get(0).weight()).thenReturn(2.0); when(hosts.get(1).weight()).thenReturn(0.5); init(hosts); @@ -87,7 +87,7 @@ void unequalWeightDistribution() throws Exception { @Test void unequalWeightsButWeightsDisabled() throws Exception { - List> hosts = connections(2); + List> hosts = SelectorTestHelpers.generateHosts(2); when(hosts.get(0).weight()).thenReturn(2.0); when(hosts.get(1).weight()).thenReturn(0.5); selector = new P2CSelector<>(hosts, "testResource", true, maxEffort, failOpen, new Random(0L)); @@ -101,7 +101,7 @@ void unequalWeightsButWeightsDisabled() throws Exception { @Test void negativeWeightsTurnIntoUnweightedSelection() throws Exception { - List> hosts = connections(2); + List> hosts = SelectorTestHelpers.generateHosts(2); when(hosts.get(0).weight()).thenReturn(-1.0); init(hosts); int[] counts = runIterations(hosts); @@ -114,7 +114,7 @@ void negativeWeightsTurnIntoUnweightedSelection() throws Exception { @Test void singleHealthyHost() throws Exception { - init(connections("addr-1")); + init(SelectorTestHelpers.generateHosts("addr-1")); TestLoadBalancedConnection connection = selector.selectConnection( PREDICATE, null, false).toFuture().get(); assertThat(connection.address(), equalTo("addr-1")); @@ -123,7 +123,7 @@ void singleHealthyHost() throws Exception { @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void emptyHostSet(boolean failOpen, boolean equalWeights) { - List> hosts = connections(equalWeights); + List> hosts = generateHosts(equalWeights); this.failOpen = failOpen; init(hosts); ExecutionException ex = assertThrows(ExecutionException.class, @@ -134,7 +134,7 @@ void emptyHostSet(boolean failOpen, boolean equalWeights) { @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void singleUnhealthyHost(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1"); + List> hosts = generateHosts(equalWeights, "addr-1"); when(hosts.get(0).isHealthy()).thenReturn(false); this.failOpen = failOpen; init(hosts); @@ -152,7 +152,7 @@ void singleUnhealthyHost(boolean failOpen, boolean equalWeights) throws Exceptio @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void singleInactiveAndUnhealthyHostWithConnection(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1"); + List> hosts = generateHosts(equalWeights, "addr-1"); when(hosts.get(0).isHealthy()).thenReturn(false); when(hosts.get(0).canMakeNewConnections()).thenReturn(false); this.failOpen = failOpen; @@ -171,7 +171,7 @@ void singleInactiveAndUnhealthyHostWithConnection(boolean failOpen, boolean equa @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void singleInactiveAndUnhealthyHostWithoutConnection(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1"); + List> hosts = generateHosts(equalWeights, "addr-1"); when(hosts.get(0).isHealthy()).thenReturn(false); when(hosts.get(0).canMakeNewConnections()).thenReturn(false); when(hosts.get(0).pickConnection(PREDICATE, null)).thenReturn(null); @@ -186,7 +186,7 @@ void singleInactiveAndUnhealthyHostWithoutConnection(boolean failOpen, boolean e @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void twoHealthyActiveHosts(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1", "addr-2"); + List> hosts = generateHosts(equalWeights, "addr-1", "addr-2"); this.failOpen = failOpen; init(hosts); TestLoadBalancedConnection connection = selector.selectConnection( @@ -197,7 +197,7 @@ void twoHealthyActiveHosts(boolean failOpen, boolean equalWeights) throws Except @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void twoHealthyInactiveHostsWithConnections(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1", "addr-2"); + List> hosts = generateHosts(equalWeights, "addr-1", "addr-2"); for (Host host : hosts) { when(host.canMakeNewConnections()).thenReturn(false); } @@ -211,7 +211,7 @@ void twoHealthyInactiveHostsWithConnections(boolean failOpen, boolean equalWeigh @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void twoHealthyInactiveHostsWithoutConnections(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1", "addr-2"); + List> hosts = generateHosts(equalWeights, "addr-1", "addr-2"); for (Host host : hosts) { when(host.canMakeNewConnections()).thenReturn(false); when(host.pickConnection(PREDICATE, null)).thenReturn(null); @@ -226,7 +226,7 @@ void twoHealthyInactiveHostsWithoutConnections(boolean failOpen, boolean equalWe @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void twoUnHealthyActiveHosts(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1", "addr-2"); + List> hosts = generateHosts(equalWeights, "addr-1", "addr-2"); for (Host host : hosts) { when(host.isHealthy()).thenReturn(false); } @@ -246,7 +246,7 @@ void twoUnHealthyActiveHosts(boolean failOpen, boolean equalWeights) throws Exce @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void twoUnHealthyInactiveHosts(boolean failOpen, boolean equalWeights) { - List> hosts = connections(equalWeights, "addr-1", "addr-2"); + List> hosts = generateHosts(equalWeights, "addr-1", "addr-2"); for (Host host : hosts) { when(host.isHealthy()).thenReturn(false); when(host.canMakeNewConnections()).thenReturn(false); @@ -261,7 +261,7 @@ void twoUnHealthyInactiveHosts(boolean failOpen, boolean equalWeights) { @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void doesntBiasTowardHostsWithConnections(boolean failOpen, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1", "addr-2"); + List> hosts = generateHosts(equalWeights, "addr-1", "addr-2"); // we setup the first host to always be preferred by score, but it also doesn't have any connections. when(hosts.get(0).pickConnection(any(), any())).thenReturn(null); when(hosts.get(0).score()).thenReturn(10); @@ -276,7 +276,7 @@ void doesntBiasTowardHostsWithConnections(boolean failOpen, boolean equalWeights @Test void selectsExistingConnectionsFromNonPreferredHost() throws Exception { - List> hosts = connections("addr-1", "addr-2"); + List> hosts = SelectorTestHelpers.generateHosts("addr-1", "addr-2"); // we setup the first host to always be preferred by score, but it also doesn't have any connections // and is unhealthy. when(hosts.get(0).pickConnection(any(), any())).thenReturn(null); @@ -292,7 +292,7 @@ void selectsExistingConnectionsFromNonPreferredHost() throws Exception { @Test void biasesTowardsHealthyHostWhenMakingConnections() throws Exception { - List> hosts = connections("addr-1", "addr-2"); + List> hosts = SelectorTestHelpers.generateHosts("addr-1", "addr-2"); when(hosts.get(0).isHealthy()).thenReturn(false); init(hosts); TestLoadBalancedConnection connection = selector.selectConnection( @@ -303,7 +303,7 @@ void biasesTowardsHealthyHostWhenMakingConnections() throws Exception { @ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void biasesTowardTheHighestWeightHost(boolean forceNewConnection, boolean equalWeights) throws Exception { - List> hosts = connections(equalWeights, "addr-1", "addr-2"); + List> hosts = generateHosts(equalWeights, "addr-1", "addr-2"); // Host 0 has the highest score, so it should always get the new connection. when(hosts.get(0).score()).thenReturn(10); init(hosts); @@ -315,7 +315,7 @@ void biasesTowardTheHighestWeightHost(boolean forceNewConnection, boolean equalW @ParameterizedTest(name = "{displayName} [{index}]: unhealthy={0}, equalWeights={1}") @CsvSource({"true, true,", "true, false", "false, true", "false, false"}) void singleInactiveHostFailOpen(boolean unhealthy, boolean equalWeights) { - List> hosts = connections(equalWeights, "addr-1"); + List> hosts = generateHosts(equalWeights, "addr-1"); when(hosts.get(0).isHealthy()).thenReturn(unhealthy); when(hosts.get(0).canMakeNewConnections()).thenReturn(false); when(hosts.get(0).pickConnection(PREDICATE, null)).thenReturn(null); diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java index e9eab938a1..d709ad7575 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/RoundRobinSelectorTest.java @@ -29,9 +29,9 @@ import javax.annotation.Nullable; import static io.servicetalk.loadbalancer.SelectorTestHelpers.PREDICATE; -import static io.servicetalk.loadbalancer.SelectorTestHelpers.connections; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isA; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; @@ -50,12 +50,12 @@ void setup() { } void init(List> hosts) { - selector = new RoundRobinSelector<>(hosts, "testResource", failOpen); + selector = new RoundRobinSelector<>(hosts, "testResource", failOpen, false); } @Test void roundRobining() throws Exception { - List> hosts = connections("addr-1", "addr-2"); + List> hosts = SelectorTestHelpers.generateHosts("addr-1", "addr-2"); init(hosts); List addresses = new ArrayList<>(); for (int i = 0; i < 5; i++) { @@ -66,9 +66,86 @@ void roundRobining() throws Exception { assertThat(addresses, contains("addr-1", "addr-2", "addr-1", "addr-2", "addr-1")); } + @Test + void roundRobiningWithUnequalWeights() throws Exception { + List> hosts = SelectorTestHelpers.generateHosts( + "addr-1", "addr-2", "addr-3"); + when(hosts.get(0).weight()).thenReturn(1.0); + when(hosts.get(1).weight()).thenReturn(2.0); + when(hosts.get(2).weight()).thenReturn(3.0); + init(hosts); + List addresses = new ArrayList<>(); + for (int i = 0; i < 18; i++) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, true).toFuture().get(); + addresses.add(connection.address()); + } + + assertThat(addresses.stream().filter("addr-1"::equals).count(), equalTo(3L)); + assertThat(addresses.stream().filter("addr-2"::equals).count(), equalTo(6L)); + assertThat(addresses.stream().filter("addr-3"::equals).count(), equalTo(9L)); + + // The stream of selections should be should be + // addr-1, addr-2, addr-3 + // F T T + // F F T + // T T T + // F T T <- starting repetition + // ... + assertThat(addresses, contains( + "addr-2", "addr-3", "addr-3", + "addr-1", "addr-2", "addr-3", + "addr-2", "addr-3", "addr-3", + "addr-1", "addr-2", "addr-3", + "addr-2", "addr-3", "addr-3", + "addr-1", "addr-2", "addr-3")); + } + + @Test + void unequalWeightsWithATrueZeroWeight() throws Exception { + List> hosts = SelectorTestHelpers.generateHosts("addr-1", "addr-2"); + when(hosts.get(0).weight()).thenReturn(0.0); + when(hosts.get(1).weight()).thenReturn(1.0); + init(hosts); + int[] counts = new int[2]; + for (int i = 0; i < 0xffff + 1; i++) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, true).toFuture().get(); + if ("addr-1".equals(connection.address())) { + counts[0]++; + } else { + counts[1]++; + } + } + + assertThat(counts[0], equalTo(0)); + assertThat(counts[1], equalTo(0xffff + 1)); + } + + @Test + void unequalWeightsWithNearWeight() throws Exception { + List> hosts = SelectorTestHelpers.generateHosts("addr-1", "addr-2"); + when(hosts.get(0).weight()).thenReturn(1d / (0xffff * 7)); + when(hosts.get(1).weight()).thenReturn(1.0); + init(hosts); + int[] counts = new int[2]; + for (int i = 0; i < 0xffff + 1; i++) { + TestLoadBalancedConnection connection = selector.selectConnection( + PREDICATE, null, true).toFuture().get(); + if ("addr-1".equals(connection.address())) { + counts[0]++; + } else { + counts[1]++; + } + } + + assertThat(counts[0], equalTo(1)); + assertThat(counts[1], equalTo(0xffff)); + } + @Test void skipUnhealthyHosts() throws Exception { - List> hosts = connections("addr-1", "addr-2"); + List> hosts = SelectorTestHelpers.generateHosts("addr-1", "addr-2"); when(hosts.get(0).isHealthy()).thenReturn(false); init(hosts); List addresses = new ArrayList<>(); @@ -83,7 +160,7 @@ void skipUnhealthyHosts() throws Exception { @ParameterizedTest(name = "{displayName} [{index}]: failOpen={0}") @ValueSource(booleans = {false, true}) void noHealthyHosts(boolean failOpen) throws Exception { - List> hosts = connections("addr-1"); + List> hosts = SelectorTestHelpers.generateHosts("addr-1"); when(hosts.get(0).isHealthy()).thenReturn(false); this.failOpen = failOpen; init(hosts); @@ -105,7 +182,7 @@ void noHealthyHosts(boolean failOpen) throws Exception { @ParameterizedTest(name = "{displayName} [{index}]: unhealthy={0} failOpen={1}") @CsvSource({"true,true", "true,false", "false,true", "false,false"}) void singleInactiveHostWithoutConnections(boolean unhealthy, boolean failOpen) { - List> hosts = connections("addr-1"); + List> hosts = SelectorTestHelpers.generateHosts("addr-1"); when(hosts.get(0).canMakeNewConnections()).thenReturn(false); when(hosts.get(0).pickConnection(PREDICATE, null)).thenReturn(null); this.failOpen = failOpen; diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java index 6f8b4b1429..cc987448a5 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/SelectorTestHelpers.java @@ -34,19 +34,19 @@ final class SelectorTestHelpers { private SelectorTestHelpers() { } - static List> connections(int count) { + static List> generateHosts(int count) { String[] addresses = new String[count]; for (int i = 0; i < count; i++) { addresses[i] = "address-" + i; } - return connections(addresses); + return generateHosts(addresses); } - static List> connections(String... addresses) { - return connections(true, addresses); + static List> generateHosts(String... addresses) { + return generateHosts(true, addresses); } - static List> connections(boolean equalWeights, String... addresses) { + static List> generateHosts(boolean equalWeights, String... addresses) { final List> results = new ArrayList<>(addresses.length); for (String addr : addresses) { Host host = mockHost(addr,