Skip to content

Commit

Permalink
Merge pull request #50 from confluentinc/harini-confluent
Browse files Browse the repository at this point in the history
Revert "Revert "Avoid expensive findEntry call in segment metadata query (apache#10892)""
  • Loading branch information
harinirajendran authored Oct 4, 2021
2 parents cffdf07 + cce3794 commit 289e47d
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.timeline;

import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -51,5 +51,9 @@ public interface TimelineLookup<VersionType, ObjectType extends Overshadowable<O
*/
List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);

@Nullable PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);
/**
* Finds the {@link PartitionChunk} for the given time interval, version and chunk number.
*/
@Nullable
PartitionChunk<ObjectType> findChunk(Interval interval, VersionType version, int partitionNum);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.timeline;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -117,10 +116,14 @@ public static void addSegments(
)
{
timeline.addAll(
Iterators.transform(segments, segment -> segment.getShardSpec().createChunk(segment)),
DataSegment::getInterval,
DataSegment::getVersion
);
Iterators.transform(
segments,
segment -> new PartitionChunkEntry<>(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
));
}

public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries()
Expand Down Expand Up @@ -183,13 +186,11 @@ public Set<ObjectType> findNonOvershadowedObjectsInInterval(Interval interval, P

public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
addAll(Iterators.singletonIterator(object), o -> interval, o -> version);
addAll(Iterators.singletonIterator(new PartitionChunkEntry<>(interval, version, object)));
}

private void addAll(
final Iterator<PartitionChunk<ObjectType>> objects,
final Function<ObjectType, Interval> intervalFunction,
final Function<ObjectType, VersionType> versionFunction
public void addAll(
final Iterator<PartitionChunkEntry<VersionType, ObjectType>> objects
)
{
lock.writeLock().lock();
Expand All @@ -198,9 +199,10 @@ private void addAll(
final IdentityHashMap<TimelineEntry, Interval> allEntries = new IdentityHashMap<>();

while (objects.hasNext()) {
PartitionChunk<ObjectType> object = objects.next();
Interval interval = intervalFunction.apply(object.getObject());
VersionType version = versionFunction.apply(object.getObject());
PartitionChunkEntry<VersionType, ObjectType> chunkEntry = objects.next();
PartitionChunk<ObjectType> object = chunkEntry.getChunk();
Interval interval = chunkEntry.getInterval();
VersionType version = chunkEntry.getVersion();
Map<VersionType, TimelineEntry> exists = allTimelineEntries.get(interval);
TimelineEntry entry;

Expand Down Expand Up @@ -284,15 +286,15 @@ public PartitionChunk<ObjectType> remove(Interval interval, VersionType version,

@Override
@Nullable
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version)
public PartitionChunk<ObjectType> findChunk(Interval interval, VersionType version, int partitionNum)
{
lock.readLock().lock();
try {
for (Entry<Interval, TreeMap<VersionType, TimelineEntry>> entry : allTimelineEntries.entrySet()) {
if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) {
TimelineEntry foundEntry = entry.getValue().get(version);
if (foundEntry != null) {
return foundEntry.getPartitionHolder().asImmutable();
return foundEntry.getPartitionHolder().getChunk(partitionNum);
}
}
}
Expand Down Expand Up @@ -849,4 +851,41 @@ public int hashCode()
return Objects.hash(trueInterval, version, partitionHolder);
}
}

/**
* Stores a {@link PartitionChunk} for a given interval and version. The
* interval corresponds to the {@link LogicalSegment#getInterval()}
*/
public static class PartitionChunkEntry<VersionType, ObjectType>
{
private final Interval interval;
private final VersionType version;
private final PartitionChunk<ObjectType> chunk;

public PartitionChunkEntry(
Interval interval,
VersionType version,
PartitionChunk<ObjectType> chunk
)
{
this.interval = interval;
this.version = version;
this.chunk = chunk;
}

public Interval getInterval()
{
return interval;
}

public VersionType getVersion()
{
return version;
}

public PartitionChunk<ObjectType> getChunk()
{
return chunk;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ protected PartitionHolder(OvershadowableManager<T> overshadowableManager)
this.overshadowableManager = overshadowableManager;
}

public ImmutablePartitionHolder<T> asImmutable()
{
return new ImmutablePartitionHolder<>(OvershadowableManager.copyVisible(overshadowableManager));
}

public boolean add(PartitionChunk<T> chunk)
{
return overshadowableManager.addChunk(chunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

/**
*/
@Deprecated
public class SingleElementPartitionChunk<T> implements PartitionChunk<T>
{
private final T element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Hours;
Expand Down Expand Up @@ -221,36 +220,64 @@ public void testRemove()
}

@Test
public void testFindEntry()
public void testFindChunk()
{
Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-02"), "1")
assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01/2011-10-02"), "1", 0)
);

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-01T10"), "1")
assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01/2011-10-01T10"), "1", 0)
);

assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01T02/2011-10-02"), "1", 0)
);

assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 0)
);

IntegerPartitionChunk<OvershadowableInteger> expected = IntegerPartitionChunk.make(
10,
null,
1,
new OvershadowableInteger(
"3",
1,
21
)
);
IntegerPartitionChunk<OvershadowableInteger> actual = (IntegerPartitionChunk<OvershadowableInteger>) timeline.findChunk(
Intervals.of("2011-10-02/2011-10-03"),
"3",
1
);
Assert.assertEquals(expected, actual);
Assert.assertEquals(expected.getObject(), actual.getObject());

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01T02/2011-10-02"), "1")
null,
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 1)
);

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "1")
null,
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "2", 0)
);

Assert.assertEquals(
null,
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "2")
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0)
);

Assert.assertEquals(
null,
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-02T17"), "1")
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ public void setUp()
}

@Test
public void testFindEntryWithOverlap()
public void testFindChunkWithOverlap()
{
add("2011-01-01/2011-01-10", "1", 1);
add("2011-01-02/2011-01-05", "2", 1);

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-01-02T02/2011-01-04"), "1")
assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-01-02T02/2011-01-04"), "1", 0)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ static void assertValues(
Assert.assertEquals(expected, actualSet);
}

static void assertSingleElementChunks(
PartitionChunk<OvershadowableInteger> expected,
PartitionChunk<OvershadowableInteger> actual
)
{
SingleElementPartitionChunk<OvershadowableInteger> expectedSingle = (SingleElementPartitionChunk<OvershadowableInteger>) expected;
SingleElementPartitionChunk<OvershadowableInteger> actualSingle = (SingleElementPartitionChunk<OvershadowableInteger>) actual;
Assert.assertEquals(expectedSingle.getObject(), actualSingle.getObject());
}

static VersionedIntervalTimeline<String, OvershadowableInteger> makeStringIntegerTimeline()
{
return new VersionedIntervalTimeline<>(Ordering.natural());
Expand Down
Loading

0 comments on commit 289e47d

Please sign in to comment.