Skip to content

Commit

Permalink
fix apache#1727 - Union bySegment queries fix
Browse files Browse the repository at this point in the history
Fixes apache#1727.
revert to doing merging for results for union queries on broker.

revert unrelated changes

Add test for union query runner

Add test

remove unused imports

fix imports

fix renamed file

fix test

update docs.
  • Loading branch information
nishantmonu51 committed Sep 29, 2015
1 parent d60610c commit 573aa96
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 380 deletions.
78 changes: 0 additions & 78 deletions common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1596,79 +1596,4 @@ private VersionedIntervalTimeline<String, Integer> makeStringIntegerTimeline()
return new VersionedIntervalTimeline<String, Integer>(Ordering.<String>natural());
}

@Test
public void testUnionTimeLineLookup()
{
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
timeline,
timeline
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-06/2011-04-09", "3", 4),
createExpected("2011-04-06/2011-04-09", "3", 4)
),
(List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09")))
);
}

@Test
public void testUnionTimeLineLookupNonExistentDelegates()
{
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
timeline,
null,
timeline,
null
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-06/2011-04-09", "3", 4),
createExpected("2011-04-06/2011-04-09", "3", 4)
),
(List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) );
}

@Test
public void testUnionTimeLineLookupReturnsSortedValues()
{
timeline = makeStringIntegerTimeline();
add("2011-04-02/2011-04-06", "1", 1);
add("2011-04-03/2011-04-09", "9", 2);
VersionedIntervalTimeline t1 = timeline;
timeline = makeStringIntegerTimeline();
add("2011-04-01/2011-04-03", "2", 1);
add("2011-04-03/2011-04-10", "8", 2);
VersionedIntervalTimeline t2 = timeline;
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
t1, t2
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-03", "2", 1),
createExpected("2011-04-02/2011-04-03", "1", 1),
createExpected("2011-04-03/2011-04-09", "9", 2),
createExpected("2011-04-03/2011-04-10", "8", 2)

),
(List) Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-11")))
);
}



}
1 change: 1 addition & 0 deletions docs/content/querying/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This data source unions two or more table data sources.
```

Note that the data sources being unioned should have the same schema.
Union Queries should be always sent to the broker/router node and are *NOT* supported directly by the historical nodes.

### Query Data Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,43 +153,29 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
return getQueryRunnerImpl(query);
}

private <T> QueryRunner<T> getQueryRunnerImpl(final Query<T> query)
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
return new UnionQueryRunner<>(
Iterables.transform(
query.getDataSource().getNames(), new Function<String, QueryRunner>()
{
@Override
public QueryRunner apply(String queryDataSource)
{
QueryRunner<T> queryRunner = null;
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());

for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);

if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", queryDataSource)
.emit();
}
}
}
}
if (queryRunner != null) {
return queryRunner;
} else {
return new NoopQueryRunner();
}
}
}
), conglomerate.findFactory(query).getToolchest()
);
if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", queryDataSource)
.emit();
}
}
}
}

return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
}

private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem
Expand Down
27 changes: 14 additions & 13 deletions processing/src/main/java/io/druid/query/UnionQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,51 @@
package io.druid.query;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;

import java.util.Map;

public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final Iterable<QueryRunner> baseRunners;
private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;

public UnionQueryRunner(
Iterable<QueryRunner> baseRunners,
QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest
)
{
this.baseRunners = baseRunners;
this.baseRunner = baseRunner;
this.toolChest = toolChest;
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
if (Iterables.size(baseRunners) == 1) {
return Iterables.getOnlyElement(baseRunners).run(query, responseContext);
} else {
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return toolChest.mergeSequencesUnordered(
Sequences.simple(
Iterables.transform(
baseRunners,
new Function<QueryRunner, Sequence<T>>()
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),
new Function<DataSource, Sequence<T>>()
{
@Override
public Sequence<T> apply(QueryRunner singleRunner)
public Sequence<T> apply(DataSource singleSource)
{
return singleRunner.run(
query,
return baseRunner.run(
query.withDataSource(singleSource),
responseContext
);
}
}
)
)
);
} else {
return baseRunner.run(query, responseContext);
}
}

Expand Down
30 changes: 9 additions & 21 deletions processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,19 @@ public static Collection<?> makeUnionQueryRunners(
return Arrays.asList(
new Object[][]{
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource)
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
},
{
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource)
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
},
{
makeUnionQueryRunner(
factory,
new QueryableIndexSegment(segmentId, mergedRealtimeIndex),
unionDataSource
new QueryableIndexSegment(segmentId, mergedRealtimeIndex)
)
},
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId), unionDataSource)
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
}
}
);
Expand Down Expand Up @@ -341,28 +340,17 @@ public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
}

public static <T> QueryRunner<T> makeUnionQueryRunner(
final QueryRunnerFactory<T, Query<T>> factory,
final Segment adapter,
final DataSource unionDataSource
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
factory.getToolchest().postMergeQueryDecoration(
factory.getToolchest().mergeResults(
new UnionQueryRunner<T>(
Iterables.transform(
unionDataSource.getNames(), new Function<String, QueryRunner>()
{
@Nullable
@Override
public QueryRunner apply(@Nullable String input)
{
return new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
);
}
}
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
factory.getToolchest()
)
Expand Down
Loading

0 comments on commit 573aa96

Please sign in to comment.