Skip to content

Commit

Permalink
apache#3031 Join query instance should be read-only
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Jan 14, 2020
1 parent 3291b59 commit 7921796
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 24 deletions.
19 changes: 17 additions & 2 deletions processing/src/main/java/io/druid/query/JoinQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.logger.Logger;
import io.druid.common.DateTimes;
import io.druid.common.guava.GuavaUtils;
import io.druid.common.utils.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.column.Column;
Expand Down Expand Up @@ -347,6 +347,21 @@ public JoinQuery withPrefixAlias(boolean prefixAlias)
);
}

public JoinQuery withDataSources(Map<String, DataSource> dataSources)
{
return new JoinQuery(
dataSources,
getQuerySegmentSpec(),
elements,
prefixAlias,
asArray,
timeColumnName,
limit,
maxRowsInGroup,
getContext()
);
}

@Override
public String toString()
{
Expand Down
8 changes: 7 additions & 1 deletion processing/src/main/java/io/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,21 @@ public static Query iterate(Query query, Function<Query, Query> function)
}
} else if (query instanceof JoinQuery) {
JoinQuery joinQuery = (JoinQuery) query;
Map<String, DataSource> rewritten = Maps.newHashMap();
for (Map.Entry<String, DataSource> entry : joinQuery.getDataSources().entrySet()) {
if (entry.getValue() instanceof QueryDataSource) {
Query source = ((QueryDataSource) entry.getValue()).getQuery();
Query converted = iterate(source, function);
if (source != converted) {
entry.setValue(QueryDataSource.of(converted));
rewritten.put(entry.getKey(), QueryDataSource.of(converted));
}
}
}
if (!rewritten.isEmpty()) {
Map<String, DataSource> copy = Maps.newHashMap(joinQuery.getDataSources());
copy.putAll(rewritten);
query = joinQuery.withDataSources(copy);
}
} else if (query instanceof UnionAllQuery) {
UnionAllQuery<?> union = (UnionAllQuery) query;
if (union.getQuery() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
cancelUrl = new URL(String.format("http://%s/druid/v2/%s", host, query.getId()));

if (!query.getContextBoolean(Query.DISABLE_LOG, false)) {
log.debug("Querying queryId[%s] url[%s]", query.getId(), url);
log.debug("Querying [%s][%s:%s] to url[%s]", query.getId(), query.getType(), query.getDataSource(), url);
}

final QueryMetrics<? super Query<T>> queryMetrics = toolChest.makeMetrics(query);
Expand Down
13 changes: 7 additions & 6 deletions server/src/main/java/io/druid/client/StreamHandlerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public StreamHandler create(final Query query, final URL url, final int queueSiz

private class BaseHandler implements StreamHandler
{
private final String queryId;
private final Query query;
private final boolean disableLog;
private final URL url;

Expand All @@ -88,7 +88,7 @@ private class BaseHandler implements StreamHandler

private BaseHandler(Query query, URL url, int queueSize)
{
this.queryId = query.getId();
this.query = query;
this.disableLog = query.getContextBoolean(Query.DISABLE_LOG, false);
this.url = url;
this.queue = new LinkedBlockingDeque<>(queueSize <= 0 ? Integer.MAX_VALUE : queueSize);
Expand All @@ -111,8 +111,9 @@ public ClientResponse<InputStream> handleResponse(HttpResponse response)
HttpResponseStatus status = response.getStatus();
if (!disableLog) {
log.debug(
"Initial response from url[%s] for queryId[%s] with status[%s] in %,d msec",
url, queryId, status, TimeUnit.NANOSECONDS.toMillis(responseStartTimeNs - requestStartTimeNs)
"Initial response from url[%s] for [%s][%s:%s] with status[%s] in %,d msec",
url, query.getId(), query.getType(), query.getDataSource(),
status, TimeUnit.NANOSECONDS.toMillis(responseStartTimeNs - requestStartTimeNs)
);
}

Expand Down Expand Up @@ -236,8 +237,8 @@ public ClientResponse<InputStream> done(ClientResponse<InputStream> clientRespon
long nodeTimeNs = stopTimeNs - requestStartTimeNs;
if (!disableLog) {
log.debug(
"Completed queryId[%s] request to url[%s] with %,d bytes in %,d msec [%s/s].",
queryId,
"Completed [%s][%s:%s] request to url[%s] with %,d bytes in %,d msec [%s/s].",
query.getId(), query.getType(), query.getDataSource(),
url,
byteCount.get(),
TimeUnit.NANOSECONDS.toMillis(nodeTimeNs),
Expand Down
31 changes: 18 additions & 13 deletions sql/src/main/java/io/druid/sql/calcite/rel/DruidJoinRel.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Lists;
import io.druid.query.Druids;
import io.druid.query.JoinElement;
import io.druid.query.JoinQuery;
import io.druid.query.JoinType;
import io.druid.query.Queries;
import io.druid.query.Query;
Expand Down Expand Up @@ -126,6 +127,22 @@ public DruidQuery toDruidQuery(boolean finalizeAggregations)
rightKeys.add(rightOrder.get(rightKey));
}

final Query leftDruid = leftQuery.getQuery();
final Query rightDruid = rightQuery.getQuery();
final String leftAlias = toAlias(leftDruid);

String rightAlias = toAlias(rightDruid);
while (leftAlias.equals(rightAlias)) {
rightAlias += "$";
}

final JoinQuery query = new Druids.JoinQueryBuilder()
.dataSource(leftAlias, QueryDataSource.of(leftDruid))
.dataSource(rightAlias, QueryDataSource.of(rightDruid))
.element(new JoinElement(JoinType.fromString(joinType.name()), leftAlias, leftKeys, rightAlias, rightKeys))
.asArray(true)
.build();

return new DruidQuery()
{
@Override
Expand All @@ -143,19 +160,7 @@ public RowSignature getOutputRowSignature()
@Override
public Query getQuery()
{
final Query leftDruid = leftQuery.getQuery();
final Query rightDruid = rightQuery.getQuery();
final String leftAlias = toAlias(leftDruid);
String rightAlias = toAlias(rightDruid);
while (leftAlias.equals(rightAlias)) {
rightAlias += "$";
}
return new Druids.JoinQueryBuilder()
.dataSource(leftAlias, QueryDataSource.of(leftDruid))
.dataSource(rightAlias, QueryDataSource.of(rightDruid))
.element(new JoinElement(JoinType.fromString(joinType.name()), leftAlias, leftKeys, rightAlias, rightKeys))
.asArray(true)
.build();
return query;
}
};
}
Expand Down
31 changes: 30 additions & 1 deletion sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.client.ServerView;
import io.druid.client.TimelineServerView;
import io.druid.common.utils.Sequences;
import io.druid.data.ValueDesc;
Expand All @@ -40,10 +42,12 @@
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery.AnalysisType;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.sql.calcite.table.DruidTable.WithTimestamp;
import io.druid.sql.calcite.table.RowSignature;
import io.druid.sql.calcite.view.DruidViewMacro;
import io.druid.sql.calcite.view.ViewManager;
import io.druid.timeline.DataSegment;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;

Expand All @@ -59,7 +63,7 @@ public class DruidSchema extends AbstractSchema
implements BiFunction<String, WithTimestamp, WithTimestamp>
{
public static final String NAME = "druid";
public static final long CACHE_VALID_MSEC = 20_000; // 20 sec
public static final long CACHE_VALID_MSEC = 300_000; // 5 min

private final QuerySegmentWalker segmentWalker;
private final TimelineServerView serverView;
Expand All @@ -77,6 +81,31 @@ public DruidSchema(
this.serverView = Preconditions.checkNotNull(serverView, "serverView");
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
this.cached = Maps.newConcurrentMap();
serverView.registerTimelineCallback(
MoreExecutors.sameThreadExecutor(),
new TimelineServerView.TimelineCallback()
{
@Override
public ServerView.CallbackAction timelineInitialized()
{
return ServerView.CallbackAction.CONTINUE;
}

@Override
public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment)
{
cached.remove(segment.getDataSource());
return ServerView.CallbackAction.CONTINUE;
}

@Override
public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, final DataSegment segment)
{
cached.remove(segment.getDataSource());
return ServerView.CallbackAction.CONTINUE;
}
}
);
}

@Override
Expand Down

0 comments on commit 7921796

Please sign in to comment.