diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentId.java b/core/src/main/java/org/apache/druid/timeline/SegmentId.java index 8430524021c0..29b006a92c7e 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentId.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentId.java @@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.timeline.partition.ShardSpec; -import org.joda.time.Chronology; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -80,6 +79,12 @@ public final class SegmentId implements Comparable */ private static final Interner STRING_INTERNER = Interners.newWeakInterner(); + /** + * Store Intervals since creating them each time before returning is an expensive operation + * To decrease the memory required for storing intervals, intern them, since the number of distinct values is "low" + */ + private static final Interner INTERVAL_INTERNER = Interners.newWeakInterner(); + private static final char DELIMITER = '_'; private static final Splitter DELIMITER_SPLITTER = Splitter.on(DELIMITER); private static final Joiner DELIMITER_JOINER = Joiner.on(DELIMITER); @@ -258,14 +263,7 @@ public static SegmentId dummy(String dataSource, int partitionNum) } private final String dataSource; - /** - * {@code intervalStartMillis}, {@link #intervalEndMillis} and {@link #intervalChronology} are the three fields of - * an {@link Interval}. Storing them directly to flatten the structure and reduce the heap space consumption. - */ - private final long intervalStartMillis; - private final long intervalEndMillis; - @Nullable - private final Chronology intervalChronology; + private final Interval interval; private final String version; private final int partitionNum; @@ -278,9 +276,7 @@ public static SegmentId dummy(String dataSource, int partitionNum) private SegmentId(String dataSource, Interval interval, String version, int partitionNum) { this.dataSource = STRING_INTERNER.intern(Objects.requireNonNull(dataSource)); - this.intervalStartMillis = interval.getStartMillis(); - this.intervalEndMillis = interval.getEndMillis(); - this.intervalChronology = interval.getChronology(); + this.interval = INTERVAL_INTERNER.intern(interval); // Versions are timestamp-based Strings, interning of them doesn't make sense. If this is not the case, interning // could be conditionally allowed via a system property. this.version = Objects.requireNonNull(version); @@ -297,9 +293,7 @@ private int computeHashCode() hashCode = hashCode * 1000003 + version.hashCode(); hashCode = hashCode * 1000003 + dataSource.hashCode(); - hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis); - hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis); - hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology); + hashCode = hashCode * 1000003 + interval.hashCode(); return hashCode; } @@ -310,17 +304,17 @@ public String getDataSource() public DateTime getIntervalStart() { - return new DateTime(intervalStartMillis, intervalChronology); + return new DateTime(interval.getStartMillis(), interval.getChronology()); } public DateTime getIntervalEnd() { - return new DateTime(intervalEndMillis, intervalChronology); + return new DateTime(interval.getEndMillis(), interval.getChronology()); } public Interval getInterval() { - return new Interval(intervalStartMillis, intervalEndMillis, intervalChronology); + return interval; } public String getVersion() @@ -340,7 +334,7 @@ public SegmentId withInterval(Interval newInterval) public SegmentDescriptor toDescriptor() { - return new SegmentDescriptor(Intervals.utc(intervalStartMillis, intervalEndMillis), version, partitionNum); + return new SegmentDescriptor(Intervals.utc(interval.getStartMillis(), interval.getEndMillis()), version, partitionNum); } @Override @@ -357,9 +351,7 @@ public boolean equals(Object o) // are equal as well as all other fields used to compute them, the partitionNums are also guaranteed to be equal. return hashCode == that.hashCode && dataSource.equals(that.dataSource) && - intervalStartMillis == that.intervalStartMillis && - intervalEndMillis == that.intervalEndMillis && - Objects.equals(intervalChronology, that.intervalChronology) && + interval.equals(that.interval) && version.equals(that.version); } @@ -376,11 +368,11 @@ public int compareTo(SegmentId o) if (result != 0) { return result; } - result = Long.compare(intervalStartMillis, o.intervalStartMillis); + result = Long.compare(interval.getStartMillis(), o.interval.getStartMillis()); if (result != 0) { return result; } - result = Long.compare(intervalEndMillis, o.intervalEndMillis); + result = Long.compare(interval.getEndMillis(), o.interval.getEndMillis()); if (result != 0) { return result; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java index 22387406f453..50820f7fd56b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java @@ -20,9 +20,9 @@ package org.apache.druid.server.coordinator; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.server.coordinator.cost.ClusterCostCache; +import org.apache.druid.server.coordinator.cost.SegmentsCostCache; import org.apache.druid.timeline.DataSegment; import java.util.Set; @@ -59,7 +59,7 @@ protected double computeCost(DataSegment proposalSegment, ServerHolder server, b double cost = clusterCostCache.computeCost(serverName, proposalSegment); // add segments that will be loaded to the cost - cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment); + cost += costCacheForLoadingSegments(server, proposalSegment); if (server.getAvailableSize() <= 0) { return Double.POSITIVE_INFINITY; @@ -70,10 +70,19 @@ protected double computeCost(DataSegment proposalSegment, ServerHolder server, b return cost * (server.getMaxSize() / server.getAvailableSize()); } - private ClusterCostCache costCacheForLoadingSegments(ServerHolder server) + private double costCacheForLoadingSegments(ServerHolder server, DataSegment proposalSegment) { + final double t0 = proposalSegment.getInterval().getStartMillis(); + final double t1 = (proposalSegment.getInterval().getEndMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR; + double costForLoadingSegments = 0d; final Set loadingSegments = server.getPeon().getSegmentsToLoad(); - return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build(); + for (DataSegment segment : loadingSegments) { + final double start = (segment.getInterval().getStartMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR; + final double end = (segment.getInterval().getEndMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR; + final double multiplier = segment.getDataSource().equals(proposalSegment.getDataSource()) ? 2d : 1d; + costForLoadingSegments += multiplier * intervalCost(t1, start, end); + } + return costForLoadingSegments; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java index 9271de28425b..b2bded7de37a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java @@ -95,7 +95,7 @@ public class SegmentsCostCache */ private static final double HALF_LIFE_DAYS = 1.0; private static final double LAMBDA = Math.log(2) / HALF_LIFE_DAYS; - private static final double MILLIS_FACTOR = TimeUnit.DAYS.toMillis(1) / LAMBDA; + public static final double MILLIS_FACTOR = TimeUnit.DAYS.toMillis(1) / LAMBDA; /** * LIFE_THRESHOLD is used to avoid calculations for segments that are "far"