Skip to content

Commit

Permalink
Merge branch 'master' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed Apr 18, 2022
2 parents 2b370c1 + c25a556 commit cba9dd5
Show file tree
Hide file tree
Showing 82 changed files with 3,222 additions and 1,280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static <T extends Enum<T>> T getEnumIfPresent(final Class<T> enumClass, f

/**
* If first argument is not null, return it, else return the other argument. Sort of like
* {@link com.google.common.base.Objects#firstNonNull(Object, Object)} except will not explode if both arguments are
* {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are
* null.
*/
@Nullable
Expand All @@ -85,7 +85,8 @@ public static <T> T firstNonNull(@Nullable T arg1, @Nullable T arg2)

/**
* Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture}
* automatically. Especially when we call {@link com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of
* automatically. Especially when we call
* {@link static <V> ListenableFuture<List<V>> com.google.common.util.concurrent.Futures#allAsList(Iterable<? extends ListenableFuture <? extends V>> futures)} to create a batch of
* future.
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@ default Predicate<Throwable> getRetryCondition()
{
return Predicates.alwaysFalse();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public JsonInputFormat(
super(flattenSpec);
this.featureSpec = featureSpec == null ? Collections.emptyMap() : featureSpec;
this.objectMapper = new ObjectMapper();
this.keepNullColumns = keepNullColumns == null ? false : keepNullColumns;
if (keepNullColumns != null) {
this.keepNullColumns = keepNullColumns;
} else {
this.keepNullColumns = flattenSpec != null && flattenSpec.isUseFieldDiscovery();
}
for (Entry<String, Boolean> entry : this.featureSpec.entrySet()) {
Feature feature = Feature.valueOf(entry.getKey());
objectMapper.configure(feature, entry.getValue());
Expand All @@ -88,6 +92,12 @@ public Map<String, Boolean> getFeatureSpec()
return featureSpec;
}

@JsonProperty
public boolean isKeepNullColumns()
{
return keepNullColumns;
}

@Override
public boolean isSplittable()
{
Expand Down
49 changes: 32 additions & 17 deletions core/src/main/java/org/apache/druid/math/expr/ExprEval.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,19 @@ public static NonnullPair<ExpressionType, Object[]> coerceListToArray(@Nullable
if (coercedType == Long.class || coercedType == Integer.class) {
return new NonnullPair<>(
ExpressionType.LONG_ARRAY,
val.stream().map(x -> x != null ? ((Number) x).longValue() : null).toArray()
val.stream().map(x -> x != null ? ExprEval.ofType(ExpressionType.LONG, x).value() : null).toArray()
);
}
if (coercedType == Float.class || coercedType == Double.class) {
return new NonnullPair<>(
ExpressionType.DOUBLE_ARRAY,
val.stream().map(x -> x != null ? ((Number) x).doubleValue() : null).toArray()
val.stream().map(x -> x != null ? ExprEval.ofType(ExpressionType.DOUBLE, x).value() : null).toArray()
);
}
// default to string
return new NonnullPair<>(
ExpressionType.STRING_ARRAY,
val.stream().map(x -> x != null ? x.toString() : null).toArray()
val.stream().map(x -> x != null ? ExprEval.ofType(ExpressionType.STRING, x).value() : null).toArray()
);
}
if (homogenizeMultiValueStrings) {
Expand Down Expand Up @@ -194,7 +194,7 @@ public static ExpressionType findArrayType(@Nullable Object[] val)
*/
private static Class convertType(@Nullable Class existing, Class next)
{
if (Number.class.isAssignableFrom(next) || next == String.class) {
if (Number.class.isAssignableFrom(next) || next == String.class || next == Boolean.class) {
if (existing == null) {
return next;
}
Expand Down Expand Up @@ -348,6 +348,12 @@ public static ExprEval bestEffortOf(@Nullable Object val)
}
return new LongExprEval((Number) val);
}
if (val instanceof Boolean) {
if (ExpressionProcessing.useStrictBooleans()) {
return ofLongBoolean((Boolean) val);
}
return new StringExprEval(String.valueOf(val));
}
if (val instanceof Long[]) {
return new ArrayExprEval(ExpressionType.LONG_ARRAY, (Long[]) val);
}
Expand All @@ -360,20 +366,13 @@ public static ExprEval bestEffortOf(@Nullable Object val)
if (val instanceof String[]) {
return new ArrayExprEval(ExpressionType.STRING_ARRAY, (String[]) val);
}
if (val instanceof Object[]) {
ExpressionType arrayType = findArrayType((Object[]) val);
if (arrayType != null) {
return new ArrayExprEval(arrayType, (Object[]) val);
}
// default to string if array is empty
return new ArrayExprEval(ExpressionType.STRING_ARRAY, (Object[]) val);
}

if (val instanceof List) {
if (val instanceof List || val instanceof Object[]) {
final List<?> theList = val instanceof List ? ((List<?>) val) : Arrays.asList((Object[]) val);
// do not convert empty lists to arrays with a single null element here, because that should have been done
// by the selectors preparing their ObjectBindings if necessary. If we get to this point it was legitimately
// empty
NonnullPair<ExpressionType, Object[]> coerced = coerceListToArray((List<?>) val, false);
NonnullPair<ExpressionType, Object[]> coerced = coerceListToArray(theList, false);
if (coerced == null) {
return bestEffortOf(null);
}
Expand All @@ -400,7 +399,7 @@ public static ExprEval ofType(@Nullable ExpressionType type, @Nullable Object va
return new ArrayExprEval(ExpressionType.STRING_ARRAY, (String[]) value);
}
if (value instanceof Object[]) {
return new ArrayExprEval(ExpressionType.STRING_ARRAY, (Object[]) value);
return bestEffortOf(value);
}
if (value instanceof List) {
return bestEffortOf(value);
Expand All @@ -413,6 +412,9 @@ public static ExprEval ofType(@Nullable ExpressionType type, @Nullable Object va
if (value instanceof Number) {
return ofLong((Number) value);
}
if (value instanceof Boolean) {
return ofLongBoolean((Boolean) value);
}
if (value instanceof String) {
return ofLong(ExprEval.computeNumber((String) value));
}
Expand All @@ -421,6 +423,12 @@ public static ExprEval ofType(@Nullable ExpressionType type, @Nullable Object va
if (value instanceof Number) {
return ofDouble((Number) value);
}
if (value instanceof Boolean) {
if (ExpressionProcessing.useStrictBooleans()) {
return ofLongBoolean((Boolean) value);
}
return ofDouble(Evals.asDouble((Boolean) value));
}
if (value instanceof String) {
return ofDouble(ExprEval.computeNumber((String) value));
}
Expand All @@ -442,13 +450,14 @@ public static ExprEval ofType(@Nullable ExpressionType type, @Nullable Object va

return ofComplex(type, value);
case ARRAY:
if (value instanceof Object[]) {
// nested arrays, here be dragons... don't do any fancy coercion, assume everything is already sane types...
if (type.getElementType().isArray()) {
return ofArray(type, (Object[]) value);
}
// in a better world, we might get an object that matches the type signature for arrays and could do a switch
// statement here, but this is not that world yet, and things that are array typed might also be non-arrays,
// e.g. we might get a String instead of String[], so just fallback to bestEffortOf
return bestEffortOf(value);
return bestEffortOf(value).castTo(type);
}
throw new IAE("Cannot create type [%s]", type);
}
Expand All @@ -459,6 +468,12 @@ public static Number computeNumber(@Nullable String value)
if (value == null) {
return null;
}
if (Evals.asBoolean(value)) {
return 1.0;
}
if (value.equalsIgnoreCase("false")) {
return 0.0;
}
Number rv;
Long v = GuavaUtils.tryParseLong(value);
// Do NOT use ternary operator here, because it makes Java to convert Long to Double
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.google.common.collect.Ordering;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

public abstract class BaseDimensionRangeShardSpec implements ShardSpec
{
protected final List<String> dimensions;
@Nullable
protected final StringTuple start;
@Nullable
protected final StringTuple end;

protected BaseDimensionRangeShardSpec(
List<String> dimensions,
@Nullable StringTuple start,
@Nullable StringTuple end
)
{
this.dimensions = dimensions;
this.start = start;
this.end = end;
}

@Override
public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return createLookup(dimensions, shardSpecs);
}

private static ShardSpecLookup createLookup(List<String> dimensions, List<? extends ShardSpec> shardSpecs)
{
BaseDimensionRangeShardSpec[] rangeShardSpecs = new BaseDimensionRangeShardSpec[shardSpecs.size()];
for (int i = 0; i < shardSpecs.size(); i++) {
rangeShardSpecs[i] = (BaseDimensionRangeShardSpec) shardSpecs.get(i);
}
final Comparator<StringTuple> startComparator = Comparators.naturalNullsFirst();
final Comparator<StringTuple> endComparator = Ordering.natural().nullsLast();

final Comparator<BaseDimensionRangeShardSpec> shardSpecComparator = Comparator
.comparing((BaseDimensionRangeShardSpec spec) -> spec.start, startComparator)
.thenComparing(spec -> spec.end, endComparator);

Arrays.sort(rangeShardSpecs, shardSpecComparator);

return (long timestamp, InputRow row) -> {
StringTuple inputRowTuple = getInputRowTuple(dimensions, row);
int startIndex = 0;
int endIndex = shardSpecs.size() - 1;
while (startIndex <= endIndex) {
int mid = (startIndex + endIndex) >>> 1;
BaseDimensionRangeShardSpec rangeShardSpec = rangeShardSpecs[mid];
if (startComparator.compare(inputRowTuple, rangeShardSpec.start) < 0) {
endIndex = mid - 1;
} else if (endComparator.compare(inputRowTuple, rangeShardSpec.end) < 0) {
return rangeShardSpec;
} else {
startIndex = mid + 1;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

protected static StringTuple getInputRowTuple(List<String> dimensions, InputRow inputRow)
{
final String[] inputDimensionValues = new String[dimensions.size()];
for (int i = 0; i < dimensions.size(); ++i) {
// Get the values of this dimension, treat multiple values as null
List<String> values = inputRow.getDimension(dimensions.get(i));
inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null;
}

return StringTuple.create(inputDimensionValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;

import javax.annotation.Nullable;
import java.util.List;
Expand All @@ -40,14 +38,10 @@
* @see BuildingSingleDimensionShardSpec
* @see BuildingDimensionRangeShardSpec
*/
public class DimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
public class DimensionRangeBucketShardSpec extends BaseDimensionRangeShardSpec
implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
{
private final int bucketId;
private final List<String> dimensions;
@Nullable
private final StringTuple start;
@Nullable
private final StringTuple end;

@JsonCreator
public DimensionRangeBucketShardSpec(
Expand All @@ -57,6 +51,7 @@ public DimensionRangeBucketShardSpec(
@JsonProperty("end") @Nullable StringTuple end
)
{
super(dimensions, start, end);
// Verify that the tuple sizes and number of dimensions are the same
Preconditions.checkArgument(
start == null || start.size() == dimensions.size(),
Expand All @@ -68,9 +63,6 @@ public DimensionRangeBucketShardSpec(
);

this.bucketId = bucketId;
this.dimensions = dimensions;
this.start = start;
this.end = end;
}

@Override
Expand Down Expand Up @@ -119,24 +111,6 @@ public BuildingDimensionRangeShardSpec convert(int partitionId)
);
}

@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((DimensionRangeBucketShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

private boolean isInChunk(InputRow inputRow)
{
return DimensionRangeShardSpec.isInChunk(dimensions, start, end, inputRow);
}

@Override
public String getType()
{
Expand Down
Loading

0 comments on commit cba9dd5

Please sign in to comment.