From a22d4137250b7f074fe596396755494905617e99 Mon Sep 17 00:00:00 2001 From: hqx871 Date: Sat, 16 Apr 2022 00:07:06 +0800 Subject: [PATCH] Use binary search to improve DimensionRangeShardSpec lookup (#12417) If there are many shards, mapper of IndexGeneratorJob seems to spend a lot of time in calling DimensionRangeShardSpec.isInChunk to lookup target shard. This can be significantly improved by using binary search instead of comparing an input row to every shardSpec. Changes: * Add `BaseDimensionRangeShardSpec` which provides a binary-search-based implementation for `createLookup` * `DimensionRangeShardSpec`, `SingleDimensionShardSpec`, and `DimensionRangeBucketShardSpec` now extend `BaseDimensionRangeShardSpec` --- .../BaseDimensionRangeShardSpec.java | 103 ++++++ .../DimensionRangeBucketShardSpec.java | 32 +- .../partition/DimensionRangeShardSpec.java | 58 +--- .../SingleDimensionRangeBucketShardSpec.java | 31 +- .../partition/SingleDimensionShardSpec.java | 58 ---- .../DimensionRangeShardSpecTest.java | 328 ++++-------------- ...ngleDimensionRangeBucketShardSpecTest.java | 6 +- .../SingleDimensionShardSpecTest.java | 168 ++++----- 8 files changed, 272 insertions(+), 512 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java new file mode 100644 index 000000000000..9d5abd6f76e6 --- /dev/null +++ b/core/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import com.google.common.collect.Ordering; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.StringTuple; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Comparators; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +public abstract class BaseDimensionRangeShardSpec implements ShardSpec +{ + protected final List dimensions; + @Nullable + protected final StringTuple start; + @Nullable + protected final StringTuple end; + + protected BaseDimensionRangeShardSpec( + List dimensions, + @Nullable StringTuple start, + @Nullable StringTuple end + ) + { + this.dimensions = dimensions; + this.start = start; + this.end = end; + } + + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return createLookup(dimensions, shardSpecs); + } + + private static ShardSpecLookup createLookup(List dimensions, List shardSpecs) + { + BaseDimensionRangeShardSpec[] rangeShardSpecs = new BaseDimensionRangeShardSpec[shardSpecs.size()]; + for (int i = 0; i < shardSpecs.size(); i++) { + rangeShardSpecs[i] = (BaseDimensionRangeShardSpec) shardSpecs.get(i); + } + final Comparator startComparator = Comparators.naturalNullsFirst(); + final Comparator endComparator = Ordering.natural().nullsLast(); + + final Comparator shardSpecComparator = Comparator + .comparing((BaseDimensionRangeShardSpec spec) -> spec.start, startComparator) + .thenComparing(spec -> spec.end, endComparator); + + Arrays.sort(rangeShardSpecs, shardSpecComparator); + + return (long timestamp, InputRow row) -> { + StringTuple inputRowTuple = getInputRowTuple(dimensions, row); + int startIndex = 0; + int endIndex = shardSpecs.size() - 1; + while (startIndex <= endIndex) { + int mid = (startIndex + endIndex) >>> 1; + BaseDimensionRangeShardSpec rangeShardSpec = rangeShardSpecs[mid]; + if (startComparator.compare(inputRowTuple, rangeShardSpec.start) < 0) { + endIndex = mid - 1; + } else if (endComparator.compare(inputRowTuple, rangeShardSpec.end) < 0) { + return rangeShardSpec; + } else { + startIndex = mid + 1; + } + } + throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); + }; + } + + protected static StringTuple getInputRowTuple(List dimensions, InputRow inputRow) + { + final String[] inputDimensionValues = new String[dimensions.size()]; + for (int i = 0; i < dimensions.size(); ++i) { + // Get the values of this dimension, treat multiple values as null + List values = inputRow.getDimension(dimensions.get(i)); + inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null; + } + + return StringTuple.create(inputDimensionValues); + } +} diff --git a/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java index bfeb75247b6a..6d187eef8ae5 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java @@ -22,9 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.StringTuple; -import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; import java.util.List; @@ -40,14 +38,10 @@ * @see BuildingSingleDimensionShardSpec * @see BuildingDimensionRangeShardSpec */ -public class DimensionRangeBucketShardSpec implements BucketNumberedShardSpec +public class DimensionRangeBucketShardSpec extends BaseDimensionRangeShardSpec + implements BucketNumberedShardSpec { private final int bucketId; - private final List dimensions; - @Nullable - private final StringTuple start; - @Nullable - private final StringTuple end; @JsonCreator public DimensionRangeBucketShardSpec( @@ -57,6 +51,7 @@ public DimensionRangeBucketShardSpec( @JsonProperty("end") @Nullable StringTuple end ) { + super(dimensions, start, end); // Verify that the tuple sizes and number of dimensions are the same Preconditions.checkArgument( start == null || start.size() == dimensions.size(), @@ -68,9 +63,6 @@ public DimensionRangeBucketShardSpec( ); this.bucketId = bucketId; - this.dimensions = dimensions; - this.start = start; - this.end = end; } @Override @@ -119,24 +111,6 @@ public BuildingDimensionRangeShardSpec convert(int partitionId) ); } - @Override - public ShardSpecLookup getLookup(List shardSpecs) - { - return (long timestamp, InputRow row) -> { - for (ShardSpec spec : shardSpecs) { - if (((DimensionRangeBucketShardSpec) spec).isInChunk(row)) { - return spec; - } - } - throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); - }; - } - - private boolean isInChunk(InputRow inputRow) - { - return DimensionRangeShardSpec.isInChunk(dimensions, start, end, inputRow); - } - @Override public String getType() { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java index 058303346078..543931a8882d 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java @@ -25,9 +25,7 @@ import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeSet; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.StringTuple; -import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; import java.util.Collections; @@ -38,15 +36,10 @@ /** * {@link ShardSpec} for partitioning based on ranges of one or more dimensions. */ -public class DimensionRangeShardSpec implements ShardSpec +public class DimensionRangeShardSpec extends BaseDimensionRangeShardSpec { public static final int UNKNOWN_NUM_CORE_PARTITIONS = -1; - private final List dimensions; - @Nullable - private final StringTuple start; - @Nullable - private final StringTuple end; private final int partitionNum; private final int numCorePartitions; @@ -65,15 +58,13 @@ public DimensionRangeShardSpec( @JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions // nullable for backward compatibility ) { + super(dimensions, start, end); Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0"); Preconditions.checkArgument( dimensions != null && !dimensions.isEmpty(), "dimensions should be non-null and non-empty" ); - this.dimensions = dimensions; - this.start = start; - this.end = end; this.partitionNum = partitionNum; this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions; } @@ -117,24 +108,6 @@ public boolean isNumCorePartitionsUnknown() return numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS; } - @Override - public ShardSpecLookup getLookup(final List shardSpecs) - { - return createLookup(shardSpecs); - } - - private static ShardSpecLookup createLookup(List shardSpecs) - { - return (long timestamp, InputRow row) -> { - for (ShardSpec spec : shardSpecs) { - if (((DimensionRangeShardSpec) spec).isInChunk(row)) { - return spec; - } - } - throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); - }; - } - @Override public List getDomainDimensions() { @@ -279,33 +252,6 @@ public PartitionChunk createChunk(T obj) } } - private boolean isInChunk(InputRow inputRow) - { - return isInChunk(dimensions, start, end, inputRow); - } - - public static boolean isInChunk( - List dimensions, - @Nullable StringTuple start, - @Nullable StringTuple end, - InputRow inputRow - ) - { - final String[] inputDimensionValues = new String[dimensions.size()]; - for (int i = 0; i < dimensions.size(); ++i) { - // Get the values of this dimension, treat multiple values as null - List values = inputRow.getDimension(dimensions.get(i)); - inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null; - } - final StringTuple inputRowTuple = StringTuple.create(inputDimensionValues); - - int inputVsStart = inputRowTuple.compareTo(start); - int inputVsEnd = inputRowTuple.compareTo(end); - - return (inputVsStart >= 0 || start == null) - && (inputVsEnd < 0 || end == null); - } - @Override public String getType() { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java index b90d8b87976e..e47b5aafaeae 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java @@ -21,11 +21,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.data.input.StringTuple; import javax.annotation.Nullable; -import java.util.List; +import java.util.Collections; import java.util.Objects; /** @@ -33,7 +32,8 @@ * * @see BuildingSingleDimensionShardSpec */ -public class SingleDimensionRangeBucketShardSpec implements BucketNumberedShardSpec +public class SingleDimensionRangeBucketShardSpec extends BaseDimensionRangeShardSpec + implements BucketNumberedShardSpec { private final int bucketId; private final String dimension; @@ -50,6 +50,11 @@ public SingleDimensionRangeBucketShardSpec( @JsonProperty("end") @Nullable String end ) { + super( + dimension == null ? Collections.emptyList() : Collections.singletonList(dimension), + start == null ? null : StringTuple.create(start), + end == null ? null : StringTuple.create(end) + ); this.bucketId = bucketId; this.dimension = dimension; this.start = start; @@ -89,24 +94,6 @@ public BuildingSingleDimensionShardSpec convert(int partitionId) return new BuildingSingleDimensionShardSpec(bucketId, dimension, start, end, partitionId); } - public boolean isInChunk(InputRow inputRow) - { - return SingleDimensionShardSpec.isInChunk(dimension, start, end, inputRow); - } - - @Override - public ShardSpecLookup getLookup(List shardSpecs) - { - return (long timestamp, InputRow row) -> { - for (ShardSpec spec : shardSpecs) { - if (((SingleDimensionRangeBucketShardSpec) spec).isInChunk(row)) { - return spec; - } - } - throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); - }; - } - @Override public String getType() { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java index 700af53a840e..a8ebbe8dfc86 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java @@ -22,17 +22,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonValue; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.StringTuple; -import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; @@ -114,24 +110,6 @@ public String getEnd() return end; } - @Override - public ShardSpecLookup getLookup(final List shardSpecs) - { - return createLookup(shardSpecs); - } - - static ShardSpecLookup createLookup(List shardSpecs) - { - return (long timestamp, InputRow row) -> { - for (ShardSpec spec : shardSpecs) { - if (((SingleDimensionShardSpec) spec).isInChunk(row)) { - return spec; - } - } - throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); - }; - } - private Range getRange() { Range range; @@ -167,42 +145,6 @@ public PartitionChunk createChunk(T obj) } } - @VisibleForTesting - boolean isInChunk(InputRow inputRow) - { - return isInChunk(dimension, start, end, inputRow); - } - - private static boolean checkValue(@Nullable String start, @Nullable String end, String value) - { - if (value == null) { - return start == null; - } - - if (start == null) { - return end == null || value.compareTo(end) < 0; - } - - return value.compareTo(start) >= 0 && - (end == null || value.compareTo(end) < 0); - } - - public static boolean isInChunk( - String dimension, - @Nullable String start, - @Nullable String end, - InputRow inputRow - ) - { - final List values = inputRow.getDimension(dimension); - - if (values == null || values.size() != 1) { - return checkValue(start, end, null); - } else { - return checkValue(start, end, values.get(0)); - } - } - @Override public String getType() { diff --git a/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java index d9e912e0fc95..0e29057a5426 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java @@ -19,6 +19,7 @@ package org.apache.druid.timeline.partition; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeSet; @@ -26,6 +27,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.DateTimes; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -43,261 +45,95 @@ public class DimensionRangeShardSpecTest private final List dimensions = new ArrayList<>(); @Test - public void testIsInChunk() + public void testShardSpecLookup() { - setDimensions("d1", "d2"); - - final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec( - dimensions, - StringTuple.create("India", "Delhi"), - StringTuple.create("Spain", "Valencia"), - 10, - null + setDimensions("dim1", "dim2"); + + final List shardSpecs = ImmutableList.of( + new DimensionRangeShardSpec(dimensions, null, StringTuple.create("India", "Delhi"), 1, 1), + new DimensionRangeShardSpec( + dimensions, + StringTuple.create("India", "Delhi"), + StringTuple.create("Spain", "Valencia"), + 2, + 1 + ), + new DimensionRangeShardSpec(dimensions, StringTuple.create("Spain", "Valencia"), null, 3, 1) ); - - // Verify that entries starting from (India, Delhi) until (Spain, Valencia) are in chunk - assertTrue(isInChunk( - shardSpec, - createRow("India", "Delhi") - )); - assertTrue(isInChunk( - shardSpec, - createRow("India", "Kolkata") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Japan", "Tokyo") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Spain", "Barcelona") - )); - - assertFalse(isInChunk( - shardSpec, - createRow("India", "Bengaluru") - )); - assertFalse(isInChunk( - shardSpec, - createRow("Spain", "Valencia") - )); - assertFalse(isInChunk( - shardSpec, - createRow("United Kingdom", "London") - )); - } - - @Test - public void testIsInChunk_withNullStart() - { - setDimensions("d1", "d2"); - - final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec( - dimensions, - null, - StringTuple.create("Spain", "Valencia"), - 10, - null + final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs); + final long currentTime = DateTimes.nowUtc().getMillis(); + + Assert.assertEquals( + shardSpecs.get(0), + lookup.getShardSpec( + currentTime, + createRow("France", "Paris") + ) ); - // Verify that anything before (Spain, Valencia) is in chunk - assertTrue(isInChunk( - shardSpec, - createRow(null, null) - )); - assertTrue(isInChunk( - shardSpec, - createRow(null, "Lyon") - )); - assertTrue(isInChunk( - shardSpec, - createRow("India", null) - )); - assertTrue(isInChunk( - shardSpec, - createRow("India", "Kolkata") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Japan", "Tokyo") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Spain", "Barcelona") - )); - - assertFalse(isInChunk( - shardSpec, - createRow("Spain", "Valencia") - )); - assertFalse(isInChunk( - shardSpec, - createRow("United Kingdom", "London") - )); - } - - @Test - public void testIsInChunk_withNullEnd() - { - setDimensions("d1", "d2"); - - final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec( - dimensions, - StringTuple.create("France", "Lyon"), - null, - 10, - null + Assert.assertEquals( + shardSpecs.get(0), + lookup.getShardSpec( + currentTime, + createRow("India", null) + ) ); - // Verify that anything starting from (France, Lyon) is in chunk - assertTrue(isInChunk( - shardSpec, - createRow("France", "Paris") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Japan", "Tokyo") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Spain", null) - )); - - assertFalse(isInChunk( - shardSpec, - createRow(null, null) - )); - assertFalse(isInChunk( - shardSpec, - createRow("France", null) - )); - assertFalse(isInChunk( - shardSpec, - createRow("France", "Bordeaux") - )); - } + Assert.assertEquals( + shardSpecs.get(0), + lookup.getShardSpec( + currentTime, + createRow(null, null) + ) + ); - @Test - public void testIsInChunk_withFirstDimEqual() - { - setDimensions("d1", "d2"); - - final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec( - dimensions, - StringTuple.create("France", "Bordeaux"), - StringTuple.create("France", "Paris"), - 10, - null + Assert.assertEquals( + shardSpecs.get(1), + lookup.getShardSpec( + currentTime, + createRow("India", "Delhi") + ) ); - // Verify that entries starting from (India, Bengaluru) until (India, Patna) are in chunk - assertTrue(isInChunk( - shardSpec, - createRow("France", "Bordeaux") - )); - assertTrue(isInChunk( - shardSpec, - createRow("France", "Lyon") - )); - - assertFalse(isInChunk( - shardSpec, - createRow("France", "Paris") - )); - assertFalse(isInChunk( - shardSpec, - createRow("France", "Avignon") - )); - assertFalse(isInChunk( - shardSpec, - createRow("France", "Toulouse") - )); - } + Assert.assertEquals( + shardSpecs.get(1), + lookup.getShardSpec( + currentTime, + createRow("India", "Kolkata") + ) + ); - @Test - public void testIsInChunk_withSingleDimension() - { - setDimensions("d1"); - - final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec( - dimensions, - StringTuple.create("India"), - StringTuple.create("Spain"), - 10, - null + Assert.assertEquals( + shardSpecs.get(1), + lookup.getShardSpec( + currentTime, + createRow("Spain", null) + ) ); - // Verify that entries starting from (India) until (Spain) are in chunk - assertTrue(isInChunk( - shardSpec, - createRow("India") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Japan") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Malaysia") - )); - - assertFalse(isInChunk( - shardSpec, - createRow("Belgium") - )); - assertFalse(isInChunk( - shardSpec, - createRow("Spain") - )); - assertFalse(isInChunk( - shardSpec, - createRow("United Kingdom") - )); - } + Assert.assertEquals( + shardSpecs.get(2), + lookup.getShardSpec( + currentTime, + createRow("Spain", "Valencia") + ) + ); - @Test - public void testIsInChunk_withMultiValues() - { - setDimensions("d1", "d2"); - - final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec( - dimensions, - StringTuple.create("India", "Delhi"), - StringTuple.create("Spain", "Valencia"), - 10, - null + Assert.assertEquals( + shardSpecs.get(2), + lookup.getShardSpec( + currentTime, + createRow("United Kingdom", "London") + ) ); - // Verify that entries starting from (India, Delhi) until (Spain, Valencia) are in chunk - assertTrue(isInChunk( - shardSpec, - createRow("India", "Delhi") - )); - assertTrue(isInChunk( - shardSpec, - createRow("India", "Kolkata") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Japan", "Tokyo") - )); - assertTrue(isInChunk( - shardSpec, - createRow("Spain", "Barcelona") - )); - - assertFalse(isInChunk( - shardSpec, - createRow("India", "Bengaluru") - )); - assertFalse(isInChunk( - shardSpec, - createRow("Spain", "Valencia") - )); - assertFalse(isInChunk( - shardSpec, - createRow("United Kingdom", "London") - )); + Assert.assertEquals( + shardSpecs.get(2), + lookup.getShardSpec( + currentTime, + createRow("United Kingdom", null) + ) + ); } @Test @@ -607,18 +443,6 @@ private void populateDomain(Map> domain, domain.put("city", citySet); } - /** - * Checks if the given InputRow is in the chunk represented by the given shard spec. - */ - private boolean isInChunk(DimensionRangeShardSpec shardSpec, InputRow row) - { - return DimensionRangeShardSpec.isInChunk( - shardSpec.getDimensions(), - shardSpec.getStartTuple(), - shardSpec.getEndTuple(), - row - ); - } private void setDimensions(String... dimensionNames) { diff --git a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpecTest.java index acecc649c12f..1393ba87031a 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpecTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import nl.jqno.equalsverifier.EqualsVerifier; +import nl.jqno.equalsverifier.Warning; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.DateTimes; import org.junit.Assert; @@ -113,6 +114,9 @@ public void testSerde() throws JsonProcessingException @Test public void testEquals() { - EqualsVerifier.forClass(SingleDimensionRangeBucketShardSpec.class).usingGetClass().verify(); + EqualsVerifier.forClass(SingleDimensionRangeBucketShardSpec.class) + .usingGetClass() + .suppress(Warning.ALL_FIELDS_SHOULD_BE_USED) + .verify(); } } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java index 9b14450e2aa0..55235d62fbec 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java @@ -21,22 +21,19 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableRangeSet; -import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.DateTimes; import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -46,75 +43,6 @@ public class SingleDimensionShardSpecTest { private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @Test - public void testIsInChunk() - { - Map>>> tests = ImmutableMap.>>>builder() - .put( - makeSpec(null, null), - makeListOfPairs( - true, null, - true, "a", - true, "h", - true, "p", - true, "y" - ) - ) - .put( - makeSpec(null, "m"), - makeListOfPairs( - true, null, - true, "a", - true, "h", - false, "p", - false, "y" - ) - ) - .put( - makeSpec("a", "h"), - makeListOfPairs( - false, null, - true, "a", - false, "h", - false, "p", - false, "y" - ) - ) - .put( - makeSpec("d", "u"), - makeListOfPairs( - false, null, - false, "a", - true, "h", - true, "p", - false, "y" - ) - ) - .put( - makeSpec("h", null), - makeListOfPairs( - false, null, - false, "a", - true, "h", - true, "p", - true, "y" - ) - ) - .build(); - - for (Map.Entry>>> entry : tests.entrySet()) { - SingleDimensionShardSpec spec = entry.getKey(); - for (Pair> pair : entry.getValue()) { - final InputRow inputRow = new MapBasedInputRow( - 0, - ImmutableList.of("billy"), - Maps.transformValues(pair.rhs, input -> input) - ); - Assert.assertEquals(StringUtils.format("spec[%s], row[%s]", spec, inputRow), pair.lhs, spec.isInChunk(inputRow)); - } - } - } - @Test public void testPossibleInDomain() { @@ -184,6 +112,77 @@ public void testDeserialize() throws JsonProcessingException ); } + @Test + public void testShardSpecLookup() + { + final List shardSpecs = ImmutableList.of( + new SingleDimensionShardSpec("dim", null, "c", 1, 1), + new SingleDimensionShardSpec("dim", "c", "h", 2, 1), + new SingleDimensionShardSpec("dim", "h", null, 3, 1) + ); + final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs); + final long currentTime = DateTimes.nowUtc().getMillis(); + Assert.assertEquals( + shardSpecs.get(0), + lookup.getShardSpec( + currentTime, + new MapBasedInputRow( + currentTime, + Collections.singletonList("dim"), + ImmutableMap.of("dim", "a", "time", currentTime) + ) + ) + ); + + Assert.assertEquals( + shardSpecs.get(0), + lookup.getShardSpec( + currentTime, + new MapBasedInputRow( + currentTime, + Collections.singletonList("dim"), + ImmutableMap.of("time", currentTime) + ) + ) + ); + + Assert.assertEquals( + shardSpecs.get(0), + lookup.getShardSpec( + currentTime, + new MapBasedInputRow( + currentTime, + Collections.singletonList("dim"), + ImmutableMap.of("dim", Arrays.asList("a", "b"), "time", currentTime) + ) + ) + ); + + Assert.assertEquals( + shardSpecs.get(1), + lookup.getShardSpec( + currentTime, + new MapBasedInputRow( + currentTime, + Collections.singletonList("dim"), + ImmutableMap.of("dim", "g", "time", currentTime) + ) + ) + ); + + Assert.assertEquals( + shardSpecs.get(2), + lookup.getShardSpec( + currentTime, + new MapBasedInputRow( + currentTime, + Collections.singletonList("dim"), + ImmutableMap.of("dim", "k", "time", currentTime) + ) + ) + ); + } + private void testSerde(SingleDimensionShardSpec shardSpec) throws IOException { String json = OBJECT_MAPPER.writeValueAsString(shardSpec); @@ -209,23 +208,4 @@ private SingleDimensionShardSpec makeSpec(String dimension, String start, String { return new SingleDimensionShardSpec(dimension, start, end, 0, SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS); } - - private Map makeMap(String value) - { - return value == null ? ImmutableMap.of() : ImmutableMap.of("billy", value); - } - - private List>> makeListOfPairs(Object... arguments) - { - Preconditions.checkState(arguments.length % 2 == 0); - - final ArrayList>> retVal = new ArrayList<>(); - - for (int i = 0; i < arguments.length; i += 2) { - retVal.add(Pair.of((Boolean) arguments[i], makeMap((String) arguments[i + 1]))); - } - - return retVal; - } - }