Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #1727 - Union bySegment queries fix #1730

Merged
merged 1 commit into from
Sep 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 0 additions & 78 deletions common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1596,79 +1596,4 @@ private VersionedIntervalTimeline<String, Integer> makeStringIntegerTimeline()
return new VersionedIntervalTimeline<String, Integer>(Ordering.<String>natural());
}

@Test
public void testUnionTimeLineLookup()
{
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
timeline,
timeline
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-06/2011-04-09", "3", 4),
createExpected("2011-04-06/2011-04-09", "3", 4)
),
(List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09")))
);
}

@Test
public void testUnionTimeLineLookupNonExistentDelegates()
{
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
timeline,
null,
timeline,
null
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-06/2011-04-09", "3", 4),
createExpected("2011-04-06/2011-04-09", "3", 4)
),
(List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) );
}

@Test
public void testUnionTimeLineLookupReturnsSortedValues()
{
timeline = makeStringIntegerTimeline();
add("2011-04-02/2011-04-06", "1", 1);
add("2011-04-03/2011-04-09", "9", 2);
VersionedIntervalTimeline t1 = timeline;
timeline = makeStringIntegerTimeline();
add("2011-04-01/2011-04-03", "2", 1);
add("2011-04-03/2011-04-10", "8", 2);
VersionedIntervalTimeline t2 = timeline;
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
t1, t2
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-03", "2", 1),
createExpected("2011-04-02/2011-04-03", "1", 1),
createExpected("2011-04-03/2011-04-09", "9", 2),
createExpected("2011-04-03/2011-04-10", "8", 2)

),
(List) Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-11")))
);
}



}
1 change: 1 addition & 0 deletions docs/content/querying/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This data source unions two or more table data sources.
```

Note that the data sources being unioned should have the same schema.
Union Queries should be always sent to the broker/router node and are *NOT* supported directly by the historical nodes.

### Query Data Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,43 +153,29 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
return getQueryRunnerImpl(query);
}

private <T> QueryRunner<T> getQueryRunnerImpl(final Query<T> query)
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
return new UnionQueryRunner<>(
Iterables.transform(
query.getDataSource().getNames(), new Function<String, QueryRunner>()
{
@Override
public QueryRunner apply(String queryDataSource)
{
QueryRunner<T> queryRunner = null;
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());

for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);

if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", queryDataSource)
.emit();
}
}
}
}
if (queryRunner != null) {
return queryRunner;
} else {
return new NoopQueryRunner();
}
}
}
), conglomerate.findFactory(query).getToolchest()
);
if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", queryDataSource)
.emit();
}
}
}
}

return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
}

private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem
Expand Down
27 changes: 14 additions & 13 deletions processing/src/main/java/io/druid/query/UnionQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,51 @@
package io.druid.query;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;

import java.util.Map;

public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final Iterable<QueryRunner> baseRunners;
private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;

public UnionQueryRunner(
Iterable<QueryRunner> baseRunners,
QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest
)
{
this.baseRunners = baseRunners;
this.baseRunner = baseRunner;
this.toolChest = toolChest;
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
if (Iterables.size(baseRunners) == 1) {
return Iterables.getOnlyElement(baseRunners).run(query, responseContext);
} else {
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return toolChest.mergeSequencesUnordered(
Sequences.simple(
Iterables.transform(
baseRunners,
new Function<QueryRunner, Sequence<T>>()
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),
new Function<DataSource, Sequence<T>>()
{
@Override
public Sequence<T> apply(QueryRunner singleRunner)
public Sequence<T> apply(DataSource singleSource)
{
return singleRunner.run(
query,
return baseRunner.run(
query.withDataSource(singleSource),
responseContext
);
}
}
)
)
);
} else {
return baseRunner.run(query, responseContext);
}
}

Expand Down
30 changes: 9 additions & 21 deletions processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,19 @@ public static Collection<?> makeUnionQueryRunners(
return Arrays.asList(
new Object[][]{
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource)
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
},
{
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource)
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
},
{
makeUnionQueryRunner(
factory,
new QueryableIndexSegment(segmentId, mergedRealtimeIndex),
unionDataSource
new QueryableIndexSegment(segmentId, mergedRealtimeIndex)
)
},
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId), unionDataSource)
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
}
}
);
Expand Down Expand Up @@ -341,28 +340,17 @@ public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
}

public static <T> QueryRunner<T> makeUnionQueryRunner(
final QueryRunnerFactory<T, Query<T>> factory,
final Segment adapter,
final DataSource unionDataSource
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
factory.getToolchest().postMergeQueryDecoration(
factory.getToolchest().mergeResults(
new UnionQueryRunner<T>(
Iterables.transform(
unionDataSource.getNames(), new Function<String, QueryRunner>()
{
@Nullable
@Override
public QueryRunner apply(@Nullable String input)
{
return new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
);
}
}
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
factory.getToolchest()
)
Expand Down
Loading