diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index 108950c169a..16aa38228e9 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -34,6 +34,7 @@ import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; import io.grpc.Status; +import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import java.net.SocketAddress; import java.util.ArrayList; @@ -72,7 +73,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { private ConnectivityState rawConnectivityState = IDLE; private ConnectivityState concludedState = IDLE; private final boolean enableHappyEyeballs = - GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false); + GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, true); PickFirstLeafLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -406,7 +407,16 @@ public void run() { } } - scheduleConnectionTask = helper.getSynchronizationContext().schedule( + SynchronizationContext synchronizationContext = null; + try { + synchronizationContext = helper.getSynchronizationContext(); + } catch (NullPointerException e) { + // All helpers should have a sync context, but if one doesn't (ex. user had a custom test) + // we don't want to break previously working functionality. + return; + } + + scheduleConnectionTask = synchronizationContext.schedule( new StartNextConnection(), CONNECTION_DELAY_INTERVAL_MS, TimeUnit.MILLISECONDS, diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index 871ee4e9429..ff2e918b654 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -48,6 +48,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; +import io.grpc.SynchronizationContext; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig; import io.grpc.internal.PickFirstLoadBalancer.PickFirstLoadBalancerConfig; @@ -58,6 +59,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -691,6 +693,16 @@ private static class TestLoadBalancer extends ForwardingLoadBalancer { } private class TestHelper extends ForwardingLoadBalancerHelper { + final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + + final FakeClock fakeClock = new FakeClock(); + @Override protected Helper delegate() { return null; @@ -705,6 +717,16 @@ public ChannelLogger getChannelLogger() { public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { // noop } + + @Override + public SynchronizationContext getSynchronizationContext() { + return syncContext; + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return fakeClock.getScheduledExecutorService(); + } } private static class TestSubchannel extends Subchannel { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2Test.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2Test.java index 8d448a049cb..4c0d47be567 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2Test.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2Test.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import io.grpc.ChannelCredentials; import io.grpc.ManagedChannelBuilder; @@ -38,7 +39,6 @@ import io.grpc.stub.MetadataUtils; import io.grpc.testing.TlsTesting; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; import org.junit.Test; @@ -139,7 +139,7 @@ protected ManagedChannelBuilder createChannelBuilder() { @Test public void remoteAddr() { InetSocketAddress isa = (InetSocketAddress) obtainRemoteClientAddr(); - assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress()); + assertTrue(isa.getAddress().isLoopbackAddress()); // It should not be the same as the server assertNotEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort()); } @@ -147,7 +147,7 @@ public void remoteAddr() { @Test public void localAddr() throws Exception { InetSocketAddress isa = (InetSocketAddress) obtainLocalServerAddr(); - assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress()); + assertTrue(isa.getAddress().isLoopbackAddress()); assertEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort()); } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index b827bc7514a..78f4ee28a52 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -217,6 +217,8 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); inOrder.verifyNoMoreInteractions(); assertThat(res.getStatus().isOk()).isTrue(); assertThat(subchannels).hasSize(1); @@ -325,6 +327,8 @@ public void lb_working_withoutDefaultTarget() throws Exception { inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); inOrder.verifyNoMoreInteractions(); assertThat(res.getStatus().isOk()).isTrue(); diff --git a/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java b/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java index d071d97a369..9e7b96d9546 100644 --- a/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java @@ -32,7 +32,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.Lists; import io.grpc.Attributes; @@ -128,7 +127,7 @@ public void pickAfterResolved() { (TestLb.TestSubchannelPicker) pickerCaptor.getValue(); assertThat(subchannelPicker.getReadySubchannels()).containsExactly(readySubchannel); - verifyNoMoreInteractions(mockHelper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(mockHelper); } @Test @@ -192,7 +191,7 @@ public void pickAfterResolvedUpdatedHosts() { verify(mockHelper, times(3)).createSubchannel(any(LoadBalancer.CreateSubchannelArgs.class)); inOrder.verify(mockHelper, times(2)).updateBalancingState(eq(READY), pickerCaptor.capture()); - verifyNoMoreInteractions(mockHelper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(mockHelper); } @Test diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index e061a4d0b65..449230f0f45 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -153,7 +153,7 @@ public void pickAfterResolved() throws Exception { assertEquals(READY, stateCaptor.getAllValues().get(1)); assertThat(getList(pickerCaptor.getValue())).containsExactly(readySubchannel); - verifyNoMoreInteractions(mockHelper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(mockHelper); } @Test @@ -234,7 +234,7 @@ public void pickAfterResolvedUpdatedHosts() throws Exception { picker = pickerCaptor.getValue(); assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel); - verifyNoMoreInteractions(mockHelper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(mockHelper); } @Test @@ -269,7 +269,7 @@ public void pickAfterStateChange() throws Exception { verify(subchannel, atLeastOnce()).requestConnection(); verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); - verifyNoMoreInteractions(mockHelper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(mockHelper); } @Test @@ -355,7 +355,7 @@ public void refreshNameResolutionWhenSubchannelConnectionBroken() { inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); } - verifyNoMoreInteractions(mockHelper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(mockHelper); } @Test @@ -432,7 +432,7 @@ public void nameResolutionErrorWithActiveChannels() throws Exception { LoadBalancer.PickResult pickResult2 = pickerCaptor.getValue().pickSubchannel(mockArgs); assertEquals(readySubchannel, pickResult2.getSubchannel()); - verifyNoMoreInteractions(mockHelper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(mockHelper); } @Test diff --git a/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java b/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java index 9c71ee98a92..b0239c56703 100644 --- a/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java +++ b/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java @@ -18,7 +18,10 @@ import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.Maps; import io.grpc.Attributes; @@ -32,12 +35,15 @@ import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.SynchronizationContext; +import io.grpc.internal.FakeClock; import io.grpc.internal.PickFirstLoadBalancerProvider; import java.net.SocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; @@ -60,9 +66,26 @@ public abstract class AbstractTestHelper extends ForwardingLoadBalancerHelper { protected final Map realToMockSubChannelMap = new HashMap<>(); private final Map subchannelStateListeners = Maps.newLinkedHashMap(); + private final FakeClock fakeClock; + private final SynchronizationContext syncContext; public abstract Map, Subchannel> getSubchannelMap(); + public AbstractTestHelper() { + this(new FakeClock(), new SynchronizationContext(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new RuntimeException(e); + } + })); + } + + public AbstractTestHelper(FakeClock fakeClock, SynchronizationContext syncContext) { + super(); + this.fakeClock = fakeClock; + this.syncContext = syncContext; + } + public Map getMockToRealSubChannelMap() { return mockToRealSubChannelMap; } @@ -79,6 +102,18 @@ public Map getSubchannelStateListeners() { return subchannelStateListeners; } + public static final FakeClock.TaskFilter NOT_START_NEXT_CONNECTION = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return !command.toString().contains("StartNextConnection"); + } + }; + + public static int getNumFilteredPendingTasks(FakeClock fakeClock) { + return fakeClock.getPendingTasks(NOT_START_NEXT_CONNECTION).size(); + } + public void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { Subchannel realSc = getMockToRealSubChannelMap().get(subchannel); if (realSc == null) { @@ -128,6 +163,16 @@ public void setChannel(Subchannel subchannel, Channel channel) { ((TestSubchannel)subchannel).channel = channel; } + @Override + public SynchronizationContext getSynchronizationContext() { + return syncContext; + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return fakeClock.getScheduledExecutorService(); + } + @Override public String toString() { return "Test Helper"; @@ -148,6 +193,17 @@ public static void refreshInvokedAndUpdateBS(InOrder inOrder, ConnectivityState } } + public static void verifyNoMoreMeaningfulInteractions(Helper helper) { + verify(helper, atLeast(0)).getSynchronizationContext(); + verify(helper, atLeast(0)).getScheduledExecutorService(); + verifyNoMoreInteractions(helper); + } + + public static void verifyNoMoreMeaningfulInteractions(Helper helper, InOrder inOrder) { + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + inOrder.verifyNoMoreInteractions(); + } protected class TestSubchannel extends ForwardingSubchannel { CreateSubchannelArgs args; diff --git a/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java b/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java index 0c7e8f46bcc..770217bfbaa 100644 --- a/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java @@ -101,7 +101,7 @@ protected EdsUpdate doParse(Args args, Message unpackedMessage) throws ResourceI } private static boolean isEnabledXdsDualStack() { - return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false); + return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, true); } private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) diff --git a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java index 47fd5a84e41..659bacd3626 100644 --- a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java @@ -159,7 +159,7 @@ public void pickAfterResolved() throws Exception { assertEquals(READY, stateCaptor.getAllValues().get(1)); assertThat(getList(pickerCaptor.getValue())).containsExactly(readySubchannel); - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } @Test @@ -226,7 +226,7 @@ public void pickAfterResolvedUpdatedHosts() throws Exception { assertThat(getList(pickerCaptor.getValue())).containsExactly(oldSubchannel, newSubchannel); - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } private Subchannel getSubchannel(EquivalentAddressGroup removedEag) { @@ -288,7 +288,7 @@ public void pickAfterStateChange() throws Exception { int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; verify(subchannel, times(expectedCount)).requestConnection(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } @Test @@ -319,7 +319,7 @@ public void pickAfterConfigChange() { // At this point it should use a ReadyPicker with newConfig pickerCaptor.getValue().pickSubchannel(mockArgs); verify(mockRandom, times(oldConfig.choiceCount + newConfig.choiceCount)).nextInt(1); - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } @Test @@ -375,7 +375,7 @@ public void stayTransientFailureUntilReady() { inOrder.verify(helper).updateBalancingState(eq(READY), isA(ReadyPicker.class)); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } private String getStatusString(SubchannelPicker picker) { @@ -426,7 +426,7 @@ public void refreshNameResolutionWhenSubchannelConnectionBroken() { inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); } - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } @Test @@ -548,7 +548,7 @@ public void nameResolutionErrorWithActiveChannels() throws Exception { LoadBalancer.PickResult pickResult2 = pickerCaptor.getValue().pickSubchannel(mockArgs); verify(mockRandom, times(choiceCount * 2)).nextInt(1); assertEquals(readySubchannel, pickResult2.getSubchannel()); - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } @Test diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 19383df72b9..d567dea3e36 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -59,6 +59,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; +import io.grpc.internal.FakeClock; import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.testing.TestMethodDescriptors; @@ -161,7 +162,7 @@ public void subchannelLazyConnectUntilPicked() { verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); - verifyNoMoreInteractions(helper); + AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); } @Test @@ -1055,6 +1056,9 @@ public String toString() { } private class TestHelper extends AbstractTestHelper { + public TestHelper() { + super(new FakeClock(), syncContext); + } @Override public Map, Subchannel> getSubchannelMap() { @@ -1066,11 +1070,6 @@ public String getAuthority() { return AUTHORITY; } - @Override - public SynchronizationContext getSynchronizationContext() { - return syncContext; - } - private Subchannel getMockSubchannel(Subchannel realSubchannel) { return realToMockSubChannelMap.get(realSubchannel); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index 41847d21d87..63077ddcf69 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -50,6 +52,7 @@ import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.TestUtils; import io.grpc.services.InternalCallMetricRecorder; import io.grpc.services.MetricReport; @@ -71,7 +74,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; @@ -94,8 +96,8 @@ public class WeightedRoundRobinLoadBalancerTest { @Rule public final MockitoRule mockito = MockitoJUnit.rule(); - private final TestHelper testHelperInstance = new TestHelper(); - private Helper helper = mock(Helper.class, delegatesTo(testHelperInstance)); + private final TestHelper testHelperInstance; + private final Helper helper; @Mock private LoadBalancer.PickSubchannelArgs mockArgs; @@ -134,6 +136,11 @@ public void uncaughtException(Thread t, Throwable e) { } }); + public WeightedRoundRobinLoadBalancerTest() { + testHelperInstance = new TestHelper(); + helper = mock(Helper.class, delegatesTo(testHelperInstance)); + } + @Before public void setup() { for (int i = 0; i < 3; i++) { @@ -161,6 +168,7 @@ public ClientCall answer( new FakeRandom(0)); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + reset(helper); } @Test @@ -184,9 +192,9 @@ public void wrrLifeCycle() { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -219,7 +227,8 @@ weightedChild1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); - assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); + int expectedTasks = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(expectedTasks); assertThat(getAddressesFromPick(weightedPicker)).isEqualTo(weightedChild1.getEag()); assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); @@ -229,13 +238,13 @@ weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); syncContext.execute(() -> wrr.shutdown()); for (Subchannel subchannel: subchannels.values()) { verify(subchannel).shutdown(); } - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(0); + assertThat(getNumFilteredPendingTasks()).isEqualTo(0); verifyNoMoreInteractions(mockArgs); } @@ -252,7 +261,7 @@ public void enableOobLoadReportConfig() { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -273,7 +282,8 @@ weightedChild1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.9, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); - assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); + int expectedTasks = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(expectedTasks); PickResult pickResult = weightedPicker.pickSubchannel(mockArgs); assertThat(getAddresses(pickResult)) .isEqualTo(weightedChild1.getEag()); @@ -306,9 +316,9 @@ private void pickByWeight(MetricReport r1, MetricReport r2, MetricReport r3, syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -489,19 +499,20 @@ public void emptyConfig() { assertThat(wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(null) .setAttributes(affinity).build()).isOk()).isFalse(); - verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(fakeClock.getPendingTasks()).isEmpty(); syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertThat(pickerCaptor.getValue().getClass().getName()) .isEqualTo("io.grpc.util.RoundRobinLoadBalancer$EmptyPicker"); - assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); + int expectedCount = isEnabledHappyEyeballs() ? servers.size() + 1 : 1; + assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo( expectedCount); } @Test @@ -509,9 +520,8 @@ public void blackoutPeriod() { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( - any(CreateSubchannelArgs.class)); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -532,7 +542,8 @@ weightedChild1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); - assertThat(fakeClock.forwardTime(5, TimeUnit.SECONDS)).isEqualTo(1); + int expectedCount = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(5, TimeUnit.SECONDS)).isEqualTo(expectedCount); Map pickCount = new HashMap<>(); for (int i = 0; i < 10000; i++) { EquivalentAddressGroup result = getAddressesFromPick(weightedPicker); @@ -557,14 +568,18 @@ weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on .isLessThan(0.002); } + private boolean isEnabledHappyEyeballs() { + return GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS", true); + } + @Test public void updateWeightTimer() { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -592,17 +607,18 @@ weightedChild1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); - assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); + int expectedTasks = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(expectedTasks); assertThat(getAddressesFromPick(weightedPicker)) .isEqualTo(weightedChild1.getEag()); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); weightedConfig = WeightedRoundRobinLoadBalancerConfig.newBuilder() .setWeightUpdatePeriodNanos(500_000_000L) //.5s .build(); syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); weightedChild1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); @@ -610,7 +626,8 @@ weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on InternalCallMetricRecorder.createMetricReport( 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); //timer fires, new weight updated - assertThat(fakeClock.forwardTime(500, TimeUnit.MILLISECONDS)).isEqualTo(1); + expectedTasks = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(500, TimeUnit.MILLISECONDS)).isEqualTo(expectedTasks); assertThat(getAddressesFromPick(weightedPicker)) .isEqualTo(weightedChild2.getEag()); assertThat(getAddressesFromPick(weightedPicker)) @@ -622,9 +639,9 @@ public void weightExpired() { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -645,7 +662,8 @@ weightedChild1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).on weightedChild2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); - assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); + int expectedTasks = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(expectedTasks); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { EquivalentAddressGroup result = getAddressesFromPick(weightedPicker); @@ -676,9 +694,9 @@ public void rrFallback() { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -691,7 +709,8 @@ public void rrFallback() { eq(ConnectivityState.READY), pickerCaptor.capture()); WeightedRoundRobinPicker weightedPicker = (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); - assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); + int expectedTasks = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(expectedTasks); WeightedChildLbState weightedChild1 = (WeightedChildLbState) getChild(weightedPicker, 0); WeightedChildLbState weightedChild2 = (WeightedChildLbState) getChild(weightedPicker, 1); Map qpsByChannel = ImmutableMap.of(weightedChild1.getEag(), 2, @@ -743,9 +762,9 @@ public void unknownWeightIsAvgWeight() { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); // 3 from setup plus 3 from the execute - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -791,9 +810,9 @@ public void pickFromOtherThread() throws Exception { syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); - verify(helper, times(6)).createSubchannel( + verify(helper, times(3)).createSubchannel( any(CreateSubchannelArgs.class)); - assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(getNumFilteredPendingTasks()).isEqualTo(1); Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); @@ -834,7 +853,8 @@ public void run() { } } }).start(); - assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); + int expectedTasks = isEnabledHappyEyeballs() ? 2 : 1; + assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(expectedTasks); barrier.await(); for (int i = 0; i < 1000; i++) { EquivalentAddressGroup result = getAddresses(weightedPicker.pickSubchannel(mockArgs)); @@ -1101,6 +1121,9 @@ public void removingAddressShutsdownSubchannel() { inOrder.verify(subchannel2).shutdown(); } + private int getNumFilteredPendingTasks() { + return AbstractTestHelper.getNumFilteredPendingTasks(fakeClock); + } private static final class VerifyingScheduler { private final StaticStrideScheduler delegate; @@ -1148,6 +1171,9 @@ public int nextInt() { } private class TestHelper extends AbstractTestHelper { + public TestHelper() { + super(fakeClock, syncContext); + } @Override public Map, Subchannel> getSubchannelMap() { @@ -1163,17 +1189,5 @@ public Map getMockToRealSubChannelMap() { public Map getSubchannelStateListeners() { return subchannelStateListeners; } - - @Override - public SynchronizationContext getSynchronizationContext() { - return syncContext; - } - - @Override - public ScheduledExecutorService getScheduledExecutorService() { - return fakeClock.getScheduledExecutorService(); - } - - } }