Skip to content

Commit

Permalink
Use binary search to improve DimensionRangeShardSpec lookup (#12417)
Browse files Browse the repository at this point in the history
If there are many shards, mapper of IndexGeneratorJob seems to spend a lot of time in calling
DimensionRangeShardSpec.isInChunk to lookup target shard. This can be significantly improved
by using binary search instead of comparing an input row to every shardSpec.

Changes:
* Add `BaseDimensionRangeShardSpec` which provides a binary-search-based
   implementation for `createLookup`
* `DimensionRangeShardSpec`, `SingleDimensionShardSpec`, and 
   `DimensionRangeBucketShardSpec` now extend `BaseDimensionRangeShardSpec`
  • Loading branch information
hqx871 authored Apr 15, 2022
1 parent cd6fba2 commit a22d413
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 512 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.google.common.collect.Ordering;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

public abstract class BaseDimensionRangeShardSpec implements ShardSpec
{
protected final List<String> dimensions;
@Nullable
protected final StringTuple start;
@Nullable
protected final StringTuple end;

protected BaseDimensionRangeShardSpec(
List<String> dimensions,
@Nullable StringTuple start,
@Nullable StringTuple end
)
{
this.dimensions = dimensions;
this.start = start;
this.end = end;
}

@Override
public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return createLookup(dimensions, shardSpecs);
}

private static ShardSpecLookup createLookup(List<String> dimensions, List<? extends ShardSpec> shardSpecs)
{
BaseDimensionRangeShardSpec[] rangeShardSpecs = new BaseDimensionRangeShardSpec[shardSpecs.size()];
for (int i = 0; i < shardSpecs.size(); i++) {
rangeShardSpecs[i] = (BaseDimensionRangeShardSpec) shardSpecs.get(i);
}
final Comparator<StringTuple> startComparator = Comparators.naturalNullsFirst();
final Comparator<StringTuple> endComparator = Ordering.natural().nullsLast();

final Comparator<BaseDimensionRangeShardSpec> shardSpecComparator = Comparator
.comparing((BaseDimensionRangeShardSpec spec) -> spec.start, startComparator)
.thenComparing(spec -> spec.end, endComparator);

Arrays.sort(rangeShardSpecs, shardSpecComparator);

return (long timestamp, InputRow row) -> {
StringTuple inputRowTuple = getInputRowTuple(dimensions, row);
int startIndex = 0;
int endIndex = shardSpecs.size() - 1;
while (startIndex <= endIndex) {
int mid = (startIndex + endIndex) >>> 1;
BaseDimensionRangeShardSpec rangeShardSpec = rangeShardSpecs[mid];
if (startComparator.compare(inputRowTuple, rangeShardSpec.start) < 0) {
endIndex = mid - 1;
} else if (endComparator.compare(inputRowTuple, rangeShardSpec.end) < 0) {
return rangeShardSpec;
} else {
startIndex = mid + 1;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

protected static StringTuple getInputRowTuple(List<String> dimensions, InputRow inputRow)
{
final String[] inputDimensionValues = new String[dimensions.size()];
for (int i = 0; i < dimensions.size(); ++i) {
// Get the values of this dimension, treat multiple values as null
List<String> values = inputRow.getDimension(dimensions.get(i));
inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null;
}

return StringTuple.create(inputDimensionValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;

import javax.annotation.Nullable;
import java.util.List;
Expand All @@ -40,14 +38,10 @@
* @see BuildingSingleDimensionShardSpec
* @see BuildingDimensionRangeShardSpec
*/
public class DimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
public class DimensionRangeBucketShardSpec extends BaseDimensionRangeShardSpec
implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
{
private final int bucketId;
private final List<String> dimensions;
@Nullable
private final StringTuple start;
@Nullable
private final StringTuple end;

@JsonCreator
public DimensionRangeBucketShardSpec(
Expand All @@ -57,6 +51,7 @@ public DimensionRangeBucketShardSpec(
@JsonProperty("end") @Nullable StringTuple end
)
{
super(dimensions, start, end);
// Verify that the tuple sizes and number of dimensions are the same
Preconditions.checkArgument(
start == null || start.size() == dimensions.size(),
Expand All @@ -68,9 +63,6 @@ public DimensionRangeBucketShardSpec(
);

this.bucketId = bucketId;
this.dimensions = dimensions;
this.start = start;
this.end = end;
}

@Override
Expand Down Expand Up @@ -119,24 +111,6 @@ public BuildingDimensionRangeShardSpec convert(int partitionId)
);
}

@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((DimensionRangeBucketShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

private boolean isInChunk(InputRow inputRow)
{
return DimensionRangeShardSpec.isInChunk(dimensions, start, end, inputRow);
}

@Override
public String getType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;

import javax.annotation.Nullable;
import java.util.Collections;
Expand All @@ -38,15 +36,10 @@
/**
* {@link ShardSpec} for partitioning based on ranges of one or more dimensions.
*/
public class DimensionRangeShardSpec implements ShardSpec
public class DimensionRangeShardSpec extends BaseDimensionRangeShardSpec
{
public static final int UNKNOWN_NUM_CORE_PARTITIONS = -1;

private final List<String> dimensions;
@Nullable
private final StringTuple start;
@Nullable
private final StringTuple end;
private final int partitionNum;
private final int numCorePartitions;

Expand All @@ -65,15 +58,13 @@ public DimensionRangeShardSpec(
@JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions // nullable for backward compatibility
)
{
super(dimensions, start, end);
Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
Preconditions.checkArgument(
dimensions != null && !dimensions.isEmpty(),
"dimensions should be non-null and non-empty"
);

this.dimensions = dimensions;
this.start = start;
this.end = end;
this.partitionNum = partitionNum;
this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions;
}
Expand Down Expand Up @@ -117,24 +108,6 @@ public boolean isNumCorePartitionsUnknown()
return numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS;
}

@Override
public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return createLookup(shardSpecs);
}

private static ShardSpecLookup createLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((DimensionRangeShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

@Override
public List<String> getDomainDimensions()
{
Expand Down Expand Up @@ -279,33 +252,6 @@ public <T> PartitionChunk<T> createChunk(T obj)
}
}

private boolean isInChunk(InputRow inputRow)
{
return isInChunk(dimensions, start, end, inputRow);
}

public static boolean isInChunk(
List<String> dimensions,
@Nullable StringTuple start,
@Nullable StringTuple end,
InputRow inputRow
)
{
final String[] inputDimensionValues = new String[dimensions.size()];
for (int i = 0; i < dimensions.size(); ++i) {
// Get the values of this dimension, treat multiple values as null
List<String> values = inputRow.getDimension(dimensions.get(i));
inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null;
}
final StringTuple inputRowTuple = StringTuple.create(inputDimensionValues);

int inputVsStart = inputRowTuple.compareTo(start);
int inputVsEnd = inputRowTuple.compareTo(end);

return (inputVsStart >= 0 || start == null)
&& (inputVsEnd < 0 || end == null);
}

@Override
public String getType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.data.input.StringTuple;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Collections;
import java.util.Objects;

/**
* See {@link BucketNumberedShardSpec} for how this class is used.
*
* @see BuildingSingleDimensionShardSpec
*/
public class SingleDimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
public class SingleDimensionRangeBucketShardSpec extends BaseDimensionRangeShardSpec
implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
{
private final int bucketId;
private final String dimension;
Expand All @@ -50,6 +50,11 @@ public SingleDimensionRangeBucketShardSpec(
@JsonProperty("end") @Nullable String end
)
{
super(
dimension == null ? Collections.emptyList() : Collections.singletonList(dimension),
start == null ? null : StringTuple.create(start),
end == null ? null : StringTuple.create(end)
);
this.bucketId = bucketId;
this.dimension = dimension;
this.start = start;
Expand Down Expand Up @@ -89,24 +94,6 @@ public BuildingSingleDimensionShardSpec convert(int partitionId)
return new BuildingSingleDimensionShardSpec(bucketId, dimension, start, end, partitionId);
}

public boolean isInChunk(InputRow inputRow)
{
return SingleDimensionShardSpec.isInChunk(dimension, start, end, inputRow);
}

@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((SingleDimensionRangeBucketShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

@Override
public String getType()
{
Expand Down
Loading

0 comments on commit a22d413

Please sign in to comment.