Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve CostBalancerStrategy, deprecate cachingCost #14484

Merged
merged 11 commits into from
Jun 27, 2023

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Jun 24, 2023

Motivation for balancing

With round robin segment assignment now enabled by default, coordinator runs are fast and clusters are generally well balanced. But as the name suggests, these assignments are round-robin and may be sub-optimal as they are not based on any server/segment-specific strategy.

So, in every coordinator run, assignment is immediately followed by balancing to ensure that segments are moved to the most strategic servers to optimize query performance. The balancing decisions are dictated by the strategy being used on the cluster. The most viable and strongly recommended candidate for this is the cost balancer strategy. The other two, cachingCost and diskNormalized were meant to be improvements over cost but come with their own set of challenges.

Limits of cost balancer strategy

The segment balancing algorithm is throttled by maxSegmentsToMove which has been a way to ensure that the coordinator does not get stuck performing too many cost computations. With the recent coordinator improvements, the upper bound for maxSegmentsToMove has now been set at 1000. While this number is good enough for smaller clusters with up to 100k used segments, it poses some problems at larger scales:

  • Moving 1000 segments in each coordinator run may take several hours to attain balance on a large cluster with 1M segments or more.
  • With 1M+ segments, even computations for 1000 segments may take several minutes, thus delaying coordinator cycles (typically expected to be every minute).
  • Accidentally increasing maxSegmentsToMove to higher values (say 5000) has often caused coordinator runs to get stuck for hours, resulting in the entire Druid cluster malfunctioning.

As Druid clusters start handling more segments, this problem only gets compounded.

Thus, there is need for improvement in the performance of the cost balancer strategy.

The cachingCost strategy had been contributed with similar goals in mind but it has since deviated in its method of cost computations and has been known to cause issues, often leading to imbalanced clusters.

Proposed improvements

This PR avoids redundant cost computations by leveraging the following facts:

  • The joint cost of two segments depends on
    • their intervals: A segment of interval I1 would have exactly the same cost with any segment of interval I2. In other words, computed interval costs can be reused.
    • their datasources: if the segments belong to the same datasource, the cost is simply multiplied by a constant factor of 2
  • The joint cost of two segments is negligible when there is enough gap between their intervals. This has also been exploited in cachingCost strategy while bucketing segments into compute intervals. CostBalancerStrategyTest.testJointSegmentsCostWith45DayGap() also proves such costs to be negligible.

Changes to cost strategy

  • In every ServerHolder, track the number of segments per datasource per interval
  • Perform cost computations for a given interval just once, and then multiply by a constant factor to account for the different datasources and total segment count in that interval
cost of segment S of (Datasource1, Interval1) with n segments of (Datasource2, Interval2)
= cost(I1, I2) x n

cost of segment S of (Datasource1, Interval1) with n segments of (Datasource1, Interval2)
= cost(I1, I2) x n x 2

  • Do not perform joint cost computations with segments that are outside the compute interval for the segment being considered for move
cost compute interval for segment = [segment start - 45 days, segment end + 45 days)

These are defensive limits, cachingCost actually uses 30 days as the compute gap.

  • Remove metrics segment/cost/* as they were coordinator killers! Turning on these metrics (by setting emitBalancingStats to true) has often caused the coordinator to be stuck for hours. Moreover, they are too complicated to decipher and do not provide any meaningful insight into the a Druid cluster.
  • The above metrics have been replaced with simpler metrics to track cost computation time, count and errors.

Other changes

  • Remove flaky test from CostBalancerStrategyTest. No other functional change to this test to ensure that the new strategy works as expected.
  • Remove usages of mock BalancerStrategy from LoadRuleTest
  • Clean up BalancerStrategy interface

Testing

All unit tests and simulation tests run successfully with no changes required.

Comparison of simulated run times of the BalanceSegments duty in seconds. Setup: 200 servers, maxSegmentsToMove = 1000, balancerComputeThreads = 1

Implementation Features
A (current implementation) none
B reuse computed interval costs
C ignore cost of far intervals
D (proposed) ignore cost of far intervals and reuse computed interval costs
Total segments Distribution
(intervals x partitions)
A B C D
120,000 120,000 hrs x 1 21.5 23.0 9.2 6.2
120,000 1,000 hrs x 120 22.5 2.5 20.5 2.5
120,000 6,000 days x 20 20.0 12.0 8.9 1.8
1,200,000
maxSegmentsToMove = 100
balancerComputeThreads = 1
6,000 days x 200 21.0 2.5 8.8 2.2
1,200,000
maxSegmentsToMove = 1000
balancerComputeThreads = 4
6,000 days x 200 72.5 6.4 28.1 6.2
1,200,000
maxSegmentsToMove = 1000
balancerComputeThreads = 4
3,000 days x 40
x 10 datasources
92.6 5.6 35.6 5.4

Cluster tests

🟨 PENDING

Release note

The cost balancer strategy performs much better now and is capable of moving more segments in a single coordinator run. These improvements were made by borrowing ideas from the cachingCost strategy.

The cachingCost strategy itself has been known to cause issues and is now deprecated. It is likely to be removed in future Druid releases and must not be used on any cluster.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz requested a review from AmatyaAvadhanula June 25, 2023 03:50
@AmatyaAvadhanula
Copy link
Contributor

@kfaraz This is a really neat idea.
One other improvement that can borrowed from cachingCost could be to have two kinds of maps

  1. Datasource -> Interval -> count (datasourceMap)
  2. Interval -> count (globalMap)
    Total cost computed would then be the sum of the cost using the globalMap + cost computed using the datasource's subMap. This could be a simple, yet effective optimization when there are a lot of datasources in the cluster

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 26, 2023

Thanks for the suggestion, @AmatyaAvadhanula !
I think the improvement you suggest is already implemented here, i.e. there is only one cost computation per interval (even if we have multiple datasources).

// Compute number of segments in each interval
final Object2IntOpenHashMap<Interval> intervalToSegmentCount = new Object2IntOpenHashMap<>();
server.getProjectedSegments().getDatasourceIntervalToSegmentCount().forEach(
(datasource, intervalToCount) -> {
// Segments of the same datasource as the proposalSegment count as 2
final int multiplier = segmentDatasource.equals(datasource) ? 2 : 1;
intervalToCount.object2IntEntrySet().fastForEach(entry -> {
final Interval interval = entry.getKey();
if (costComputeInterval.overlaps(interval)) {
intervalToSegmentCount.addTo(interval, multiplier * entry.getIntValue());
}
});
}
);
// Compute joint cost for each interval
double cost = 0;
final Interval segmentInterval = proposalSegment.getInterval();
cost += intervalToSegmentCount.object2IntEntrySet().stream().mapToDouble(
entry -> intervalCost(segmentInterval, entry.getKey())
* entry.getIntValue()
).sum();

@@ -334,42 +367,10 @@ public void testFindServerAfterExecutorShutdownThrowsException()
);
}

@Test(timeout = 90_000L)
public void testFindServerRaisesAlertOnTimeout()
private void verifyPlacementCost(DataSegment segment, ServerHolder server, double expectedCost)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we manually pass the expectedCost?
Can we not compare the results of the method computePlacementCost with the sum of individual costs of the candidate segment across all segments on the server (i.e the original implementation)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that can be another assertion in this test or another test.

This particular test was added to ensure that cost remains the same even when we make changes to the way cost is computed, such as in this PR. That is why the exact numerical literals have been used.

@@ -431,7 +433,7 @@ private int dropReplicas(
Iterator<ServerHolder> serverIterator =
(useRoundRobinAssignment || eligibleLiveServers.size() >= remainingNumToDrop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this beeligibleLiveServers.size() <= remainingNumToDrop ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Thanks for catching this!

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, LGTM!

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 27, 2023

Thanks a lot for the feedback, @AmatyaAvadhanula !!

@kfaraz kfaraz merged commit 4bd6bd0 into apache:master Jun 27, 2023
@kfaraz kfaraz deleted the improve_cost_strategy branch June 27, 2023 07:53
@abhishekagarwal87 abhishekagarwal87 added this to the 27.0 milestone Jul 19, 2023
sergioferragut pushed a commit to sergioferragut/druid that referenced this pull request Jul 21, 2023
Changes to `cost` strategy:
- In every `ServerHolder`, track the number of segments per datasource per interval
- Perform cost computations for a given interval just once, and then multiply by a constant
factor to account for the total segment count in that interval
- Do not perform joint cost computations with segments that are outside the compute interval
(± 45 days) for the segment being considered for move
- Remove metrics `segment/cost/*` as they were coordinator killers! Turning on these metrics
(by setting `emitBalancingStats` to true) has often caused the coordinator to be stuck for hours.
Moreover, they are too complicated to decipher and do not provide any meaningful insight into
a Druid cluster.
- Add new simpler metrics `segment/balancer/compute/*` to track cost computation time,
count and errors.

Other changes:
- Remove flaky test from `CostBalancerStrategyTest`.
- Add tests to verify that computed cost has remained unchanged
- Remove usages of mock `BalancerStrategy` from `LoadRuleTest`, `BalanceSegmentsTest`
- Clean up `BalancerStrategy` interface
kfaraz added a commit that referenced this pull request Aug 16, 2023
`cachingCost` has been deprecated in #14484 and is not advised to be used in
production clusters as it may cause usage skew across historicals which the
coordinator is unable to rectify. This PR completely disables `cachingCost` strategy
as it has now been rendered redundant due to recent performance improvements
made to `cost` strategy.

Changes
- Disable `cachingCost` strategy
- Add `DisabledCachingCostBalancerStrategyFactory` for the time being so that we
can give a proper error message before falling back to `CostBalancerStrategy`. This
will be removed in subsequent releases.
- Retain `CachingCostBalancerStrategy` for testing/benchmarking purposes.
- Add javadocs to `DiskNormalizedCostBalancerStrategy`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants