-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Coordinator drop segment selection through cost balancer #5529
Changes from 3 commits
67b9b85
ab010ff
5af6061
7b2bd2e
268abfa
106d834
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,9 @@ | |
|
||
import io.druid.timeline.DataSegment; | ||
|
||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.NavigableSet; | ||
|
||
public interface BalancerStrategy | ||
{ | ||
|
@@ -31,5 +33,10 @@ public interface BalancerStrategy | |
|
||
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders); | ||
|
||
default Iterator<ServerHolder> pickServersToDrop(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders) | ||
{ | ||
return serverHolders.descendingIterator(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to include a comment about why this descendingIterator is meaningful. (Like the comment on the old code: |
||
} | ||
|
||
void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,9 +32,13 @@ | |
import org.joda.time.Interval; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.concurrent.Callable; | ||
import java.util.NavigableSet; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import java.util.stream.Collectors; | ||
|
||
public class CostBalancerStrategy implements BalancerStrategy | ||
{ | ||
|
@@ -219,6 +223,34 @@ public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHo | |
return sampler.getRandomBalancerSegmentHolder(serverHolders); | ||
} | ||
|
||
@Override | ||
public Iterator<ServerHolder> pickServersToDrop(DataSegment toDrop, NavigableSet<ServerHolder> serverHolders) | ||
{ | ||
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList(); | ||
|
||
for (final ServerHolder server : serverHolders) { | ||
futures.add( | ||
exec.submit( | ||
() -> Pair.of(computeCost(toDrop, server, true), server) | ||
) | ||
); | ||
} | ||
|
||
final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures); | ||
|
||
try { | ||
List<Pair<Double, ServerHolder>> results = resultsFuture.get(); | ||
return results.stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is slightly obtuse (what's the double? why reverse it?) and would benefit from a comment. |
||
.sorted(Comparator.comparingDouble((Pair<Double, ServerHolder> o) -> o.lhs).reversed()) | ||
.map(x -> x.rhs).collect(Collectors.toList()) | ||
.iterator(); | ||
} | ||
catch (Exception e) { | ||
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); | ||
} | ||
return Collections.emptyIterator(); | ||
} | ||
|
||
/** | ||
* Calculates the initial cost of the Druid segment configuration. | ||
* | ||
|
@@ -341,14 +373,7 @@ protected Pair<Double, ServerHolder> chooseBestServer( | |
for (final ServerHolder server : serverHolders) { | ||
futures.add( | ||
exec.submit( | ||
new Callable<Pair<Double, ServerHolder>>() | ||
{ | ||
@Override | ||
public Pair<Double, ServerHolder> call() | ||
{ | ||
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server); | ||
} | ||
} | ||
() -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server) | ||
) | ||
); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,8 +19,9 @@ | |
|
||
package io.druid.server.coordinator.rules; | ||
|
||
import io.druid.java.util.emitter.EmittingLogger; | ||
import io.druid.java.util.common.IAE; | ||
import io.druid.java.util.emitter.EmittingLogger; | ||
import io.druid.server.coordinator.BalancerStrategy; | ||
import io.druid.server.coordinator.CoordinatorStats; | ||
import io.druid.server.coordinator.DruidCluster; | ||
import io.druid.server.coordinator.DruidCoordinator; | ||
|
@@ -39,6 +40,7 @@ | |
import java.util.Map; | ||
import java.util.NavigableSet; | ||
import java.util.Objects; | ||
import java.util.TreeSet; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Collectors; | ||
|
||
|
@@ -209,7 +211,7 @@ private ServerHolder assignPrimary( | |
} | ||
|
||
/** | ||
* @param stats {@link CoordinatorStats} to accumulate assignment statistics. | ||
* @param stats {@link CoordinatorStats} to accumulate assignment statistics. | ||
* @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was | ||
* assigned. | ||
*/ | ||
|
@@ -320,7 +322,7 @@ private void drop( | |
} else { | ||
final int currentReplicantsInTier = entry.getIntValue(); | ||
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0); | ||
numDropped = dropForTier(numToDrop, holders, segment); | ||
numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy()); | ||
} | ||
|
||
stats.addToTieredStat(DROPPED_COUNT, tier, numDropped); | ||
|
@@ -346,13 +348,17 @@ private boolean loadingInProgress(final DruidCluster druidCluster) | |
private static int dropForTier( | ||
final int numToDrop, | ||
final NavigableSet<ServerHolder> holdersInTier, | ||
final DataSegment segment | ||
final DataSegment segment, | ||
final BalancerStrategy balancerStrategy | ||
) | ||
{ | ||
int numDropped = 0; | ||
|
||
// Use the reverse order to get the holders with least available size first. | ||
final Iterator<ServerHolder> iterator = holdersInTier.descendingIterator(); | ||
final NavigableSet<ServerHolder> isServingSubset = | ||
holdersInTier.stream().filter(s -> s.isServingSegment(segment)).collect(Collectors.toCollection(TreeSet::new)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's still a (now pointless) check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left the check in case in the time between the initial filtering and the loop something changed and the server was no longer serving the segment, but I can remove the extra check if you think it's safe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is a race between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 added |
||
|
||
final Iterator<ServerHolder> iterator = balancerStrategy.pickServersToDrop(segment, isServingSubset); | ||
|
||
while (numDropped < numToDrop) { | ||
if (!iterator.hasNext()) { | ||
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have some javadoc explaining what the returned iterator should be. It's non-obvious: the "obvious" return value would be a set of servers to drop from, where the order doesn't matter. But the actual return value is ordered by preferredness, and may contain more servers than we actually want to drop from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and added javadoc for whole interface, lmk if I got anything wrong