Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into new_basic_sec2
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-wei committed Dec 13, 2017
2 parents ff244cc + 3b4395a commit 08a4e8f
Show file tree
Hide file tree
Showing 106 changed files with 435 additions and 340 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
language: java

# On 12-12-2017, Travis updated their trusty image, which caused integration tests to fail.
# The group: config instructs Travis to use the previous trusty image.
# Please see https://github.com/druid-io/druid/pull/5155 for more information.
sudo: required
dist: trusty
group: deprecated-2017Q4

jdk:
- oraclejdk8
Expand Down
4 changes: 4 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
<groupId>net.thisptr</groupId>
<artifactId>jackson-jq</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
Expand Down
139 changes: 104 additions & 35 deletions api/src/main/java/io/druid/timeline/DataSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,35 @@

package io.druid.timeline;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Function;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import io.druid.guice.annotations.PublicApi;
import io.druid.jackson.CommaListJoinDeserializer;
import io.druid.jackson.CommaListJoinSerializer;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.SegmentDescriptor;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
*/
Expand All @@ -52,15 +56,13 @@ public class DataSegment implements Comparable<DataSegment>
{
public static String delimiter = "_";
private final Integer binaryVersion;
private static final Interner<String> interner = Interners.newWeakInterner();
private static final Function<String, String> internFun = new Function<String, String>()
{
@Override
public String apply(String input)
{
return interner.intern(input);
}
};
private static final Interner<String> STRING_INTERNER = Interners.newWeakInterner();
private static final Interner<List<String>> DIMENSIONS_INTERNER = Interners.newWeakInterner();
private static final Interner<List<String>> METRICS_INTERNER = Interners.newWeakInterner();
private static final Map<String, Object> PRUNED_LOAD_SPEC = ImmutableMap.of(
"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space",
""
);

public static String makeDataSegmentIdentifier(
String dataSource,
Expand All @@ -84,50 +86,87 @@ public static String makeDataSegmentIdentifier(
return sb.toString();
}

/**
* This class is needed for optional injection of pruneLoadSpec, see
* github.com/google/guice/wiki/FrequentlyAskedQuestions#how-can-i-inject-optional-parameters-into-a-constructor
*/
@VisibleForTesting
public static class PruneLoadSpecHolder
{
@VisibleForTesting
public static final PruneLoadSpecHolder DEFAULT = new PruneLoadSpecHolder();

@Inject(optional = true) @PruneLoadSpec boolean pruneLoadSpec = false;
}

private final String dataSource;
private final Interval interval;
private final String version;
@Nullable
private final Map<String, Object> loadSpec;
private final List<String> dimensions;
private final List<String> metrics;
private final ShardSpec shardSpec;
private final long size;
private final String identifier;

public DataSegment(
String dataSource,
Interval interval,
String version,
Map<String, Object> loadSpec,
List<String> dimensions,
List<String> metrics,
ShardSpec shardSpec,
Integer binaryVersion,
long size
)
{
this(
dataSource,
interval,
version,
loadSpec,
dimensions,
metrics,
shardSpec,
binaryVersion,
size,
PruneLoadSpecHolder.DEFAULT
);
}

@JsonCreator
public DataSegment(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
// use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution
@JsonProperty("loadSpec") Map<String, Object> loadSpec,
@JsonProperty("dimensions") @JsonDeserialize(using = CommaListJoinDeserializer.class) List<String> dimensions,
@JsonProperty("metrics") @JsonDeserialize(using = CommaListJoinDeserializer.class) List<String> metrics,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
@JsonProperty("dimensions")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> dimensions,
@JsonProperty("metrics")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> metrics,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size
@JsonProperty("size") long size,
@JacksonInject PruneLoadSpecHolder pruneLoadSpecHolder
)
{
final Predicate<String> nonEmpty = new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return input != null && !input.isEmpty();
}
};

// dataSource, dimensions & metrics are stored as canonical string values to decrease memory required for storing large numbers of segments.
this.dataSource = interner.intern(dataSource);
// dataSource, dimensions & metrics are stored as canonical string values to decrease memory required for storing
// large numbers of segments.
this.dataSource = STRING_INTERNER.intern(dataSource);
this.interval = interval;
this.loadSpec = loadSpec;
this.loadSpec = pruneLoadSpecHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : prepareLoadSpec(loadSpec);
this.version = version;
this.dimensions = dimensions == null
? ImmutableList.<String>of()
: ImmutableList.copyOf(Iterables.transform(Iterables.filter(dimensions, nonEmpty), internFun));
this.metrics = metrics == null
? ImmutableList.<String>of()
: ImmutableList.copyOf(Iterables.transform(Iterables.filter(metrics, nonEmpty), internFun));
// Deduplicating dimensions and metrics lists as a whole because they are very likely the same for the same
// dataSource
this.dimensions = prepareDimensionsOrMetrics(dimensions, DIMENSIONS_INTERNER);
this.metrics = prepareDimensionsOrMetrics(metrics, METRICS_INTERNER);
this.shardSpec = (shardSpec == null) ? NoneShardSpec.instance() : shardSpec;
this.binaryVersion = binaryVersion;
this.size = size;
Expand All @@ -141,6 +180,35 @@ public boolean apply(String input)
);
}

@Nullable
private Map<String, Object> prepareLoadSpec(@Nullable Map<String, Object> loadSpec)
{
if (loadSpec == null) {
return null;
}
// Load spec is just of 3 entries on average; HashMap/LinkedHashMap consumes much more memory than ArrayMap
Map<String, Object> result = new Object2ObjectArrayMap<>(loadSpec.size());
for (Map.Entry<String, Object> e : loadSpec.entrySet()) {
result.put(STRING_INTERNER.intern(e.getKey()), e.getValue());
}
return result;
}

private List<String> prepareDimensionsOrMetrics(@Nullable List<String> list, Interner<List<String>> interner)
{
if (list == null) {
return ImmutableList.of();
} else {
List<String> result = list
.stream()
.filter(s -> !Strings.isNullOrEmpty(s))
.map(STRING_INTERNER::intern)
// TODO replace with ImmutableList.toImmutableList() when updated to Guava 21+
.collect(Collectors.collectingAndThen(Collectors.toList(), ImmutableList::copyOf));
return interner.intern(result);
}
}

/**
* Get dataSource
*
Expand All @@ -158,6 +226,7 @@ public Interval getInterval()
return interval;
}

@Nullable
@JsonProperty
public Map<String, Object> getLoadSpec()
{
Expand Down
3 changes: 0 additions & 3 deletions api/src/main/java/io/druid/timeline/DataSegmentUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@

import java.util.Objects;

/**
* identifier to DataSegment.
*/
@PublicApi
public class DataSegmentUtils
{
Expand Down
40 changes: 40 additions & 0 deletions api/src/main/java/io/druid/timeline/PruneLoadSpec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.timeline;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* This annnotation is used to inject a boolean parameter into a {@link DataSegment} constructor, which prescribes to
* drop deserialized "loadSpec" and don't store it in a field of a {@link DataSegment}. It's very useful on Brokers,
* because they store a lot of DataSegments in their heap, and loadSpec takes a lot of space, while it's not used on
* Brokers.
*/
@Target({ElementType.PARAMETER, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface PruneLoadSpec
{
}
12 changes: 11 additions & 1 deletion api/src/test/java/io/druid/timeline/DataSegmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.timeline;

import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -27,15 +28,16 @@
import com.google.common.collect.Sets;
import io.druid.TestObjectMapper;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
Expand Down Expand Up @@ -87,6 +89,14 @@ public Map<String, Range<String>> getDomain()
};
}

@Before
public void setUp()
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
mapper.setInjectableValues(injectableValues);
}

@Test
public void testV1Serialization() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private Map<Integer, Long> loadOffsetFromPreviousMetaData(Object lastCommit)
}
log.info("Loaded offset map[%s]", offsetMap);
} else {
log.makeAlert("Unable to cast lastCommit to Map for feed [%s]", feed);
log.makeAlert("Unable to cast lastCommit to Map for feed [%s]", feed).emit();
}
return offsetMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,4 @@ public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteO
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}

}
7 changes: 7 additions & 0 deletions extensions-core/hdfs-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand Down
Loading

0 comments on commit 08a4e8f

Please sign in to comment.