Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce method invocation of reservoir sampling #11257

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import io.vavr.collection.Stream;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class BalancerStrategyBenchmark
{
private static final Random RANDOM = new Random(0);
private static final Interval TEST_SEGMENT_INTERVAL = Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000");
private static final int NUMBER_OF_SERVERS = 20;

@Param({"default", "50percentOfSegmentsToConsiderPerMove", "useBatchedSegmentSampler"})
private String mode;

@Param({"10000", "100000", "1000000"})
private int numberOfSegments;

@Param({"10", "100", "1000"})
private int maxSegmentsToMove;

private final List<ServerHolder> serverHolders = new ArrayList<>();
private int reservoirSize = 1;
private double percentOfSegmentsToConsider = 100;
private final BalancerStrategy balancerStrategy = new CostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
);

@Setup(Level.Trial)
public void setup() throws IOException
{
switch (mode) {
case "50percentOfSegmentsToConsiderPerMove":
percentOfSegmentsToConsider = 50;
break;
case "useBatchedSegmentSampler":
reservoirSize = maxSegmentsToMove;
break;
default:
}

List<List<DataSegment>> segmentList = new ArrayList<>(NUMBER_OF_SERVERS);
Stream.range(0, NUMBER_OF_SERVERS).forEach(i -> segmentList.add(new ArrayList<>()));
for (int i = 0; i < numberOfSegments; i++) {
segmentList.get(RANDOM.nextInt(NUMBER_OF_SERVERS)).add(
new DataSegment(
"test",
TEST_SEGMENT_INTERVAL,
String.valueOf(i),
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
null,
0,
10L
)
);
}

for (List<DataSegment> segments : segmentList) {
serverHolders.add(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("id", "host", null, 10000000L, ServerType.HISTORICAL, "hot", 1),
3000L,
ImmutableMap.of("test", new ImmutableDruidDataSource("test", Collections.emptyMap(), segments)),
segments.size()
),
new LoadQueuePeonTester()
)
);
}
}

@Benchmark
public void pickSegmentsToMove(Blackhole blackhole)
{
Iterator<BalancerSegmentHolder> iterator = balancerStrategy.pickSegmentsToMove(
serverHolders,
Collections.emptySet(),
reservoirSize,
percentOfSegmentsToConsider
);
for (int i = 0; i < maxSegmentsToMove && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
}
}
}
2 changes: 2 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ A sample Coordinator dynamic config JSON object is shown below:
"mergeBytesLimit": 100000000,
"mergeSegmentsLimit" : 1000,
"maxSegmentsToMove": 5,
"useBatchedSegmentSampler": false,
"percentOfSegmentsToConsiderPerMove": 100,
"replicantLifetime": 15,
"replicationThrottleLimit": 10,
Expand All @@ -830,6 +831,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L|
|`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100|
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|
|`useBatchedSegmentSampler`|Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed up segment balancing process, especially if there are huge number of segments in the cluster or if there are too many segments to move.|false|
|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100|
|`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15|
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,90 @@ public interface BalancerStrategy
ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders);

/**
* Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
* Pick the best segments to move from one of the supplied set of servers according to the balancing strategy.
*
* @param serverHolders set of historicals to consider for moving segments
* @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules.
* Balancing strategies should avoid rebalancing segments for such datasources, since
* they should be loaded on all servers anyway.
* NOTE: this should really be handled on a per-segment basis, to properly support
* the interval or period-based broadcast rules. For simplicity of the initial
* implementation, only forever broadcast rules are supported.
* @param reservoirSize the reservoir size maintained by the Reservoir Sampling algorithm.
* @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when
* choosing which segment to move. {@link CoordinatorDynamicConfig} defines a
* config percentOfSegmentsToConsiderPerMove that will be used as an argument
* for implementations of this method.
*
* @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
* there are no segments to pick from (i. e. all provided serverHolders are empty).
* @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently
* reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Nullable
BalancerSegmentHolder pickSegmentToMove(
default Iterator<BalancerSegmentHolder> pickSegmentsToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
int reservoirSize,
double percentOfSegmentsToConsider
);
)
{
if (reservoirSize > 1) {
return new Iterator<BalancerSegmentHolder>()
{
private Iterator<BalancerSegmentHolder> it = sample();

private Iterator<BalancerSegmentHolder> sample()
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolders(
serverHolders,
broadcastDatasources,
reservoirSize
).iterator();
}

@Override
public boolean hasNext()
{
if (it.hasNext()) {
return true;
}
it = sample();
return it.hasNext();
}

@Override
public BalancerSegmentHolder next()
{
return it.next();
}
};
}

return new Iterator<BalancerSegmentHolder>()
{
private BalancerSegmentHolder next = sample();

private BalancerSegmentHolder sample()
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(
serverHolders,
broadcastDatasources,
percentOfSegmentsToConsider
);
}

@Override
public boolean hasNext()
{
return next != null;
}

@Override
public BalancerSegmentHolder next()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

        if (!hasNext()) {
          throw new NoSuchElementException();
        }

BalancerSegmentHolder ret = next;
next = sample();
return ret;
}
};
}

/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class CoordinatorDynamicConfig
private final int mergeSegmentsLimit;
private final int maxSegmentsToMove;
private final double percentOfSegmentsToConsiderPerMove;
private final boolean useBatchedSegmentSampler;
private final int replicantLifetime;
private final int replicationThrottleLimit;
private final int balancerComputeThreads;
Expand Down Expand Up @@ -115,6 +116,7 @@ public CoordinatorDynamicConfig(
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") int replicantLifetime,
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
Expand Down Expand Up @@ -161,6 +163,7 @@ public CoordinatorDynamicConfig(
);
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;

this.useBatchedSegmentSampler = useBatchedSegmentSampler;
this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
Expand Down Expand Up @@ -272,6 +275,12 @@ public double getPercentOfSegmentsToConsiderPerMove()
return percentOfSegmentsToConsiderPerMove;
}

@JsonProperty
public boolean useBatchedSegmentSampler()
{
return useBatchedSegmentSampler;
}

@JsonProperty
public int getReplicantLifetime()
{
Expand Down Expand Up @@ -377,6 +386,7 @@ public String toString()
", mergeSegmentsLimit=" + mergeSegmentsLimit +
", maxSegmentsToMove=" + maxSegmentsToMove +
", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove +
", useBatchedSegmentSampler=" + useBatchedSegmentSampler +
", replicantLifetime=" + replicantLifetime +
", replicationThrottleLimit=" + replicationThrottleLimit +
", balancerComputeThreads=" + balancerComputeThreads +
Expand Down Expand Up @@ -421,6 +431,9 @@ public boolean equals(Object o)
if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) {
return false;
}
if (useBatchedSegmentSampler != that.useBatchedSegmentSampler) {
return false;
}
if (replicantLifetime != that.replicantLifetime) {
return false;
}
Expand Down Expand Up @@ -469,6 +482,7 @@ public int hashCode()
mergeSegmentsLimit,
maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove,
useBatchedSegmentSampler,
replicantLifetime,
replicationThrottleLimit,
balancerComputeThreads,
Expand Down Expand Up @@ -501,6 +515,7 @@ public static class Builder
private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
Expand All @@ -513,6 +528,7 @@ public static class Builder
private Integer mergeSegmentsLimit;
private Integer maxSegmentsToMove;
private Double percentOfSegmentsToConsiderPerMove;
private Boolean useBatchedSegmentSampler;
private Integer replicantLifetime;
private Integer replicationThrottleLimit;
private Boolean emitBalancingStats;
Expand All @@ -539,6 +555,7 @@ public Builder(
@JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
@JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
Expand All @@ -561,6 +578,7 @@ public Builder(
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
this.useBatchedSegmentSampler = useBatchedSegmentSampler;
this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = balancerComputeThreads;
Expand Down Expand Up @@ -606,6 +624,12 @@ public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsTo
return this;
}

public Builder withUseBatchedSegmentSampler(boolean useBatchedSegmentSampler)
{
this.useBatchedSegmentSampler = useBatchedSegmentSampler;
return this;
}

public Builder withReplicantLifetime(int replicantLifetime)
{
this.replicantLifetime = replicantLifetime;
Expand Down Expand Up @@ -689,6 +713,7 @@ public CoordinatorDynamicConfig build()
maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
: percentOfSegmentsToConsiderPerMove,
useBatchedSegmentSampler == null ? DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime,
replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads,
Expand Down Expand Up @@ -721,6 +746,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit,
maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove,
useBatchedSegmentSampler == null ? defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime,
replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads,
Expand Down
Loading