From 67b9b85af04d3ab7b7d07897f9f00f8f46811ba2 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 23 Mar 2018 14:52:02 -0700 Subject: [PATCH 1/5] drop selection through cost balancer --- .../server/coordinator/BalancerStrategy.java | 7 +++ .../coordinator/CostBalancerStrategy.java | 43 +++++++++++++++---- .../server/coordinator/rules/LoadRule.java | 18 +++++--- .../DruidCoordinatorRuleRunnerTest.java | 8 ++-- .../coordinator/rules/LoadRuleTest.java | 7 ++- 5 files changed, 64 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java index e654cb44ecf4..af7eb8b8ae72 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java @@ -21,7 +21,9 @@ import io.druid.timeline.DataSegment; +import java.util.Iterator; import java.util.List; +import java.util.NavigableSet; public interface BalancerStrategy { @@ -31,5 +33,10 @@ public interface BalancerStrategy BalancerSegmentHolder pickSegmentToMove(List serverHolders); + default Iterator pickServersToDrop(DataSegment toDropSegment, NavigableSet serverHolders) + { + return serverHolders.descendingIterator(); + } + void emitStats(String tier, CoordinatorStats stats, List serverHolderList); } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index f241d5fbe09c..28e19b234960 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -21,6 +21,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -32,9 +33,12 @@ import org.joda.time.Interval; import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; +import java.util.NavigableSet; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; public class CostBalancerStrategy implements BalancerStrategy { @@ -219,6 +223,34 @@ public BalancerSegmentHolder pickSegmentToMove(final List serverHo return sampler.getRandomBalancerSegmentHolder(serverHolders); } + @Override + public Iterator pickServersToDrop(DataSegment toDrop, NavigableSet serverHolders) + { + List>> futures = Lists.newArrayList(); + + for (final ServerHolder server : serverHolders) { + futures.add( + exec.submit( + () -> Pair.of(computeCost(toDrop, server, true), server) + ) + ); + } + + final ListenableFuture>> resultsFuture = Futures.allAsList(futures); + + try { + List> results = resultsFuture.get(); + return results.stream() + .sorted(Comparator.comparingDouble((Pair o) -> o.lhs)) + .map(x -> x.rhs).collect(Collectors.toList()) + .iterator(); + } + catch (Exception e) { + log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); + } + return Iterators.emptyIterator(); + } + /** * Calculates the initial cost of the Druid segment configuration. * @@ -341,14 +373,7 @@ protected Pair chooseBestServer( for (final ServerHolder server : serverHolders) { futures.add( exec.submit( - new Callable>() - { - @Override - public Pair call() - { - return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server); - } - } + () -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server) ) ); } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 5af4d822c4b3..656aadf33366 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -19,8 +19,9 @@ package io.druid.server.coordinator.rules; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.common.IAE; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinator; @@ -39,6 +40,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Objects; +import java.util.TreeSet; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -209,7 +211,7 @@ private ServerHolder assignPrimary( } /** - * @param stats {@link CoordinatorStats} to accumulate assignment statistics. + * @param stats {@link CoordinatorStats} to accumulate assignment statistics. * @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was * assigned. */ @@ -320,7 +322,7 @@ private void drop( } else { final int currentReplicantsInTier = entry.getIntValue(); final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0); - numDropped = dropForTier(numToDrop, holders, segment); + numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy()); } stats.addToTieredStat(DROPPED_COUNT, tier, numDropped); @@ -346,13 +348,17 @@ private boolean loadingInProgress(final DruidCluster druidCluster) private static int dropForTier( final int numToDrop, final NavigableSet holdersInTier, - final DataSegment segment + final DataSegment segment, + final BalancerStrategy balancerStrategy ) { int numDropped = 0; - // Use the reverse order to get the holders with least available size first. - final Iterator iterator = holdersInTier.descendingIterator(); + final NavigableSet isServingSubset = + holdersInTier.stream().filter(s -> s.isServingSegment(segment)).collect(Collectors.toCollection(TreeSet::new)); + + final Iterator iterator = balancerStrategy.pickServersToDrop(segment, isServingSubset); + while (numDropped < numToDrop) { if (!iterator.hasNext()) { log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 0b7c71ba2ce6..9af1b2c7e6aa 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.DruidServer; @@ -761,9 +762,9 @@ public void testDropTooManyInDifferentTiers() ImmutableMap.of( "hot", Stream.of( - new ServerHolder( - server1.toImmutableDruidServer(), - mockPeon + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon ) ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), "normal", @@ -942,6 +943,7 @@ public void testDropServerActuallyServesSegment() LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce(); + EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes(); EasyMock.replay(anotherMockPeon); DruidCluster druidCluster = new DruidCluster( diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 51200461a959..2a52dfafecb4 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -286,6 +286,9 @@ public void testDrop() final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(2); EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); LoadRule rule = createLoadRule(ImmutableMap.of( @@ -430,7 +433,9 @@ public void testDropWithNonExistentTier() final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - + EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(1); EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); LoadRule rule = createLoadRule(ImmutableMap.of( From ab010ffd1e5aa571a615b9465c2e29e1a23f8c2a Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 23 Mar 2018 18:01:41 -0700 Subject: [PATCH 2/5] use collections.emptyIterator --- .../io/druid/server/coordinator/CostBalancerStrategy.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index 28e19b234960..93e7a468ca94 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -21,7 +21,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -33,6 +32,7 @@ import org.joda.time.Interval; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -248,7 +248,7 @@ public Iterator pickServersToDrop(DataSegment toDrop, NavigableSet catch (Exception e) { log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); } - return Iterators.emptyIterator(); + return Collections.emptyIterator(); } /** From 5af606151720ad0628dba5cf41d5adda97efc509 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 30 Mar 2018 15:03:31 -0700 Subject: [PATCH 3/5] add test to ensure does not drop from server with larger loading queue with cost balancer --- .../coordinator/CostBalancerStrategy.java | 2 +- .../DruidCoordinatorRuleRunnerTest.java | 116 +++++++++++++++++- 2 files changed, 112 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index 93e7a468ca94..99b93eca02ce 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -241,7 +241,7 @@ public Iterator pickServersToDrop(DataSegment toDrop, NavigableSet try { List> results = resultsFuture.get(); return results.stream() - .sorted(Comparator.comparingDouble((Pair o) -> o.lhs)) + .sorted(Comparator.comparingDouble((Pair o) -> o.lhs).reversed()) .map(x -> x.rhs).collect(Collectors.toList()) .iterator(); } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 9af1b2c7e6aa..5e5e03120e3d 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -713,6 +713,98 @@ public void testDropTooManyInSameTier() EasyMock.verify(mockPeon); } + @Test + public void testDropTooManyInSameTierWithLoadQueue() + { + mockCoordinator(); + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockEmptyPeon(); + + LoadQueuePeon loadingPeon = EasyMock.createMock(LoadQueuePeon.class); + mockLoadingPeon(loadingPeon, availableSegments.size() + 10, availableSegments.size()); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Lists.newArrayList( + new IntervalLoadRule( + Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("normal", 1) + ), + new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) + ) + ).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DruidServer server1 = new DruidServer( + "serverNorm", + "hostNorm", + null, + 1000, + ServerType.HISTORICAL, + "normal", + 0 + ); + server1.addDataSegment(availableSegments.get(0)); + server1.addDataSegment(availableSegments.get(1)); + + DruidServer server2 = new DruidServer( + "serverNorm2", + "hostNorm2", + null, + 1000, + ServerType.HISTORICAL, + "normal", + 0 + ); + for (DataSegment segment : availableSegments.subList(1, availableSegments.size())) { + server2.addDataSegment(segment); + } + + DruidCluster druidCluster = new DruidCluster( + null, + ImmutableMap.of( + "normal", + Stream.of( + new ServerHolder( + server1.toImmutableDruidServer(), + loadingPeon + ), + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ) + ); + + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() + .withDruidCluster(druidCluster) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build()) + .withAvailableSegments(availableSegments) + .withDatabaseRuleManager(databaseRuleManager) + .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategy(balancerStrategy) + .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal")); + Assert.assertEquals(12L, stats.getGlobalStat("deletedCount")); + + exec.shutdown(); + EasyMock.verify(mockPeon); + EasyMock.verify(loadingPeon); + } + @Test public void testDropTooManyInDifferentTiers() { @@ -1411,11 +1503,25 @@ private void mockCoordinator() private void mockEmptyPeon() { - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); - EasyMock.replay(mockPeon); + mockEmptyPeon(mockPeon); + } + + private void mockEmptyPeon(LoadQueuePeon peon) + { + EasyMock.expect(peon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes(); + EasyMock.expect(peon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes(); + EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(peon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); + EasyMock.replay(peon); + } + + private void mockLoadingPeon(LoadQueuePeon peon, long size, int segments) + { + EasyMock.expect(peon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes(); + EasyMock.expect(peon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes(); + EasyMock.expect(peon.getLoadQueueSize()).andReturn(size).atLeastOnce(); + EasyMock.expect(peon.getNumberOfSegmentsInQueue()).andReturn(segments).anyTimes(); + EasyMock.replay(peon); } private CoordinatorDynamicConfig createCoordinatorDynamicConfig() From 268abfa79b2a97c2e3e3458dc3f32a74b1c61e31 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 2 Apr 2018 13:44:05 -0700 Subject: [PATCH 4/5] javadocs and comments to clear things up --- .../server/coordinator/BalancerStrategy.java | 39 +++++++++++++++++++ .../coordinator/CostBalancerStrategy.java | 3 ++ .../server/coordinator/rules/LoadRule.java | 6 +++ 3 files changed, 48 insertions(+) diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java index af7eb8b8ae72..f2aa695ac0f8 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java @@ -25,18 +25,57 @@ import java.util.List; import java.util.NavigableSet; +/** + * This interface describes the coordinator balancing strategy, which is responsible for making decisions on where + * to place {@link DataSegment}s on historical servers (described by {@link ServerHolder}). The balancing strategy + * is used by {@link io.druid.server.coordinator.rules.LoadRule} to assign and drop segments, and by + * {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to migrate segments between historicals. + */ public interface BalancerStrategy { + /** + * Find the best server to move a {@link DataSegment} to according the the balancing strategy. + * @param proposalSegment segment to move + * @param serverHolders servers to consider as move destinations + * @return + */ ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List serverHolders); + /** + * Find the best server on which to place a {@link DataSegment} replica according to the balancing strategy + * @param proposalSegment segment to replicate + * @param serverHolders servers to consider as replica holders + * @return + */ ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders); + /** + * Pick the best segment to move from one of the supplied set of servers according to the balancing strategy. + * @param serverHolders set of historicals to consider for moving segments + * @return {@link BalancerSegmentHolder} containing segment to move and server it current resides on + */ BalancerSegmentHolder pickSegmentToMove(List serverHolders); + /** + * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first + * for a given drop strategy. One or more segments may be dropped, depending on how much the segment is + * over-replicated. + * @param toDropSegment segment to drop from one or more servers + * @param serverHolders set of historicals to consider dropping from + * @return Iterator for set of historicals, ordered by drop preference + */ default Iterator pickServersToDrop(DataSegment toDropSegment, NavigableSet serverHolders) { + // By default, use the reverse order to get the holders with least available size first. return serverHolders.descendingIterator(); } + /** + * Add balancing strategy stats during the 'balanceTier' operation of + * {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to be included + * @param tier historical tier being balanced + * @param stats stats object to add balancing strategy stats to + * @param serverHolderList servers in tier being balanced + */ void emitStats(String tier, CoordinatorStats stats, List serverHolderList); } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index 99b93eca02ce..c5ea85096f84 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -239,8 +239,11 @@ public Iterator pickServersToDrop(DataSegment toDrop, NavigableSet final ListenableFuture>> resultsFuture = Futures.allAsList(futures); try { + // results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server List> results = resultsFuture.get(); return results.stream() + // Comparator.comapringDouble will order by lowest cost... + // reverse it because we want to drop from the highest cost servers first .sorted(Comparator.comparingDouble((Pair o) -> o.lhs).reversed()) .map(x -> x.rhs).collect(Collectors.toList()) .iterator(); diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 367d02057a9f..094deddca60b 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -370,6 +370,12 @@ private static int dropForTier( if (holder.isServingSegment(segment)) { holder.getPeon().dropSegment(segment, null); ++numDropped; + } else { + log.warn( + "Server [%s] is no longer serving segment [%s], skipping drop.", + holder.getServer().getName(), + segment.getIdentifier() + ); } } From 106d8349f2a2b06125dc598ce1463ddca614201d Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 2 Apr 2018 14:11:21 -0700 Subject: [PATCH 5/5] random drop for completeness --- .../io/druid/server/coordinator/BalancerStrategy.java | 4 ++-- .../druid/server/coordinator/RandomBalancerStrategy.java | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java index f2aa695ac0f8..ec498f1154a4 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java @@ -37,7 +37,7 @@ public interface BalancerStrategy * Find the best server to move a {@link DataSegment} to according the the balancing strategy. * @param proposalSegment segment to move * @param serverHolders servers to consider as move destinations - * @return + * @return The server to move to, or null if no move should be made or no server is suitable */ ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List serverHolders); @@ -45,7 +45,7 @@ public interface BalancerStrategy * Find the best server on which to place a {@link DataSegment} replica according to the balancing strategy * @param proposalSegment segment to replicate * @param serverHolders servers to consider as replica holders - * @return + * @return The server to replicate to, or null if no suitable server is found */ ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders); diff --git a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java index 8c2aed3397ab..092811831198 100644 --- a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java @@ -21,7 +21,10 @@ import io.druid.timeline.DataSegment; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.NavigableSet; import java.util.Random; public class RandomBalancerStrategy implements BalancerStrategy @@ -54,6 +57,12 @@ public BalancerSegmentHolder pickSegmentToMove(List serverHolders) return sampler.getRandomBalancerSegmentHolder(serverHolders); } + @Override + public Iterator pickServersToDrop(DataSegment toDropSegment, NavigableSet serverHolders) + { + return serverHolders.stream().sorted(Comparator.comparingDouble(o -> new Random().nextDouble())).iterator(); + } + @Override public void emitStats(String tier, CoordinatorStats stats, List serverHolderList) {