-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid unnecessary cache building for cachingCost #12465
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,6 @@ | |
import org.apache.druid.java.util.common.Intervals; | ||
import org.apache.druid.query.SegmentDescriptor; | ||
import org.apache.druid.timeline.partition.ShardSpec; | ||
import org.joda.time.Chronology; | ||
import org.joda.time.DateTime; | ||
import org.joda.time.Interval; | ||
|
||
|
@@ -80,6 +79,12 @@ public final class SegmentId implements Comparable<SegmentId> | |
*/ | ||
private static final Interner<String> STRING_INTERNER = Interners.newWeakInterner(); | ||
|
||
/** | ||
* Store Intervals since creating them each time before returning is an expensive operation | ||
* To decrease the memory required for storing intervals, intern them, since the number of distinct values is "low" | ||
*/ | ||
private static final Interner<Interval> INTERVAL_INTERNER = Interners.newWeakInterner(); | ||
|
||
private static final char DELIMITER = '_'; | ||
private static final Splitter DELIMITER_SPLITTER = Splitter.on(DELIMITER); | ||
private static final Joiner DELIMITER_JOINER = Joiner.on(DELIMITER); | ||
|
@@ -258,14 +263,7 @@ public static SegmentId dummy(String dataSource, int partitionNum) | |
} | ||
|
||
private final String dataSource; | ||
/** | ||
* {@code intervalStartMillis}, {@link #intervalEndMillis} and {@link #intervalChronology} are the three fields of | ||
* an {@link Interval}. Storing them directly to flatten the structure and reduce the heap space consumption. | ||
*/ | ||
private final long intervalStartMillis; | ||
private final long intervalEndMillis; | ||
@Nullable | ||
private final Chronology intervalChronology; | ||
private final Interval interval; | ||
private final String version; | ||
private final int partitionNum; | ||
|
||
|
@@ -278,9 +276,7 @@ public static SegmentId dummy(String dataSource, int partitionNum) | |
private SegmentId(String dataSource, Interval interval, String version, int partitionNum) | ||
{ | ||
this.dataSource = STRING_INTERNER.intern(Objects.requireNonNull(dataSource)); | ||
this.intervalStartMillis = interval.getStartMillis(); | ||
this.intervalEndMillis = interval.getEndMillis(); | ||
this.intervalChronology = interval.getChronology(); | ||
this.interval = INTERVAL_INTERNER.intern(interval); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can interval ever be null here? |
||
// Versions are timestamp-based Strings, interning of them doesn't make sense. If this is not the case, interning | ||
// could be conditionally allowed via a system property. | ||
this.version = Objects.requireNonNull(version); | ||
|
@@ -297,9 +293,7 @@ private int computeHashCode() | |
hashCode = hashCode * 1000003 + version.hashCode(); | ||
|
||
hashCode = hashCode * 1000003 + dataSource.hashCode(); | ||
hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis); | ||
hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis); | ||
hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology); | ||
hashCode = hashCode * 1000003 + interval.hashCode(); | ||
return hashCode; | ||
} | ||
|
||
|
@@ -310,17 +304,17 @@ public String getDataSource() | |
|
||
public DateTime getIntervalStart() | ||
{ | ||
return new DateTime(intervalStartMillis, intervalChronology); | ||
return new DateTime(interval.getStartMillis(), interval.getChronology()); | ||
} | ||
|
||
public DateTime getIntervalEnd() | ||
{ | ||
return new DateTime(intervalEndMillis, intervalChronology); | ||
return new DateTime(interval.getEndMillis(), interval.getChronology()); | ||
} | ||
|
||
public Interval getInterval() | ||
{ | ||
return new Interval(intervalStartMillis, intervalEndMillis, intervalChronology); | ||
return interval; | ||
} | ||
|
||
public String getVersion() | ||
|
@@ -340,7 +334,7 @@ public SegmentId withInterval(Interval newInterval) | |
|
||
public SegmentDescriptor toDescriptor() | ||
{ | ||
return new SegmentDescriptor(Intervals.utc(intervalStartMillis, intervalEndMillis), version, partitionNum); | ||
return new SegmentDescriptor(Intervals.utc(interval.getStartMillis(), interval.getEndMillis()), version, partitionNum); | ||
} | ||
|
||
@Override | ||
|
@@ -357,9 +351,7 @@ public boolean equals(Object o) | |
// are equal as well as all other fields used to compute them, the partitionNums are also guaranteed to be equal. | ||
return hashCode == that.hashCode && | ||
dataSource.equals(that.dataSource) && | ||
intervalStartMillis == that.intervalStartMillis && | ||
intervalEndMillis == that.intervalEndMillis && | ||
Objects.equals(intervalChronology, that.intervalChronology) && | ||
interval.equals(that.interval) && | ||
version.equals(that.version); | ||
} | ||
|
||
|
@@ -376,11 +368,11 @@ public int compareTo(SegmentId o) | |
if (result != 0) { | ||
return result; | ||
} | ||
result = Long.compare(intervalStartMillis, o.intervalStartMillis); | ||
result = Long.compare(interval.getStartMillis(), o.interval.getStartMillis()); | ||
if (result != 0) { | ||
return result; | ||
} | ||
result = Long.compare(intervalEndMillis, o.intervalEndMillis); | ||
result = Long.compare(interval.getEndMillis(), o.interval.getEndMillis()); | ||
if (result != 0) { | ||
return result; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,9 +20,9 @@ | |
package org.apache.druid.server.coordinator; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.util.concurrent.ListeningExecutorService; | ||
import org.apache.druid.server.coordinator.cost.ClusterCostCache; | ||
import org.apache.druid.server.coordinator.cost.SegmentsCostCache; | ||
import org.apache.druid.timeline.DataSegment; | ||
|
||
import java.util.Set; | ||
|
@@ -59,7 +59,7 @@ protected double computeCost(DataSegment proposalSegment, ServerHolder server, b | |
double cost = clusterCostCache.computeCost(serverName, proposalSegment); | ||
|
||
// add segments that will be loaded to the cost | ||
cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment); | ||
cost += costCacheForLoadingSegments(server, proposalSegment); | ||
|
||
if (server.getAvailableSize() <= 0) { | ||
return Double.POSITIVE_INFINITY; | ||
|
@@ -70,10 +70,19 @@ protected double computeCost(DataSegment proposalSegment, ServerHolder server, b | |
return cost * (server.getMaxSize() / server.getAvailableSize()); | ||
} | ||
|
||
private ClusterCostCache costCacheForLoadingSegments(ServerHolder server) | ||
private double costCacheForLoadingSegments(ServerHolder server, DataSegment proposalSegment) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Rename to |
||
{ | ||
final double t0 = proposalSegment.getInterval().getStartMillis(); | ||
final double t1 = (proposalSegment.getInterval().getEndMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR; | ||
double costForLoadingSegments = 0d; | ||
final Set<DataSegment> loadingSegments = server.getPeon().getSegmentsToLoad(); | ||
return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build(); | ||
for (DataSegment segment : loadingSegments) { | ||
final double start = (segment.getInterval().getStartMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR; | ||
final double end = (segment.getInterval().getEndMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR; | ||
final double multiplier = segment.getDataSource().equals(proposalSegment.getDataSource()) ? 2d : 1d; | ||
costForLoadingSegments += multiplier * intervalCost(t1, start, end); | ||
} | ||
return costForLoadingSegments; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this!