Skip to content

Commit

Permalink
apache#1634 Handle duplicated column names in Join
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Mar 11, 2019
1 parent d1a7091 commit 3f62753
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 10 deletions.
8 changes: 5 additions & 3 deletions processing/src/main/java/io/druid/query/JoinQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,17 +465,19 @@ public JoinDelegate toArrayJoin()
@Override
public List<String> estimatedOutputColumns()
{
List<Query<Map<String, Object>>> queries = getQueries();
Set<String> uniqueNames = Sets.newHashSet();
List<String> outputColumns = Lists.newArrayList();

List<Query<Map<String, Object>>> queries = getQueries();
for (int i = 0; i < queries.size(); i++) {
List<String> columns = ((ArrayOutputSupport<?>) queries.get(i)).estimatedOutputColumns();
Preconditions.checkArgument(!GuavaUtils.isNullOrEmpty(columns));
if (prefixAliases == null) {
outputColumns.addAll(columns);
Queries.uniqueNames(columns, uniqueNames, outputColumns);
} else {
String alias = prefixAliases.get(i) + ".";
for (String column : columns) {
outputColumns.add(alias + column);
outputColumns.add(Queries.uniqueName(alias + column, uniqueNames));
}
}
}
Expand Down
33 changes: 30 additions & 3 deletions processing/src/main/java/io/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,25 +206,52 @@ private static Schema _relaySchema(Query subQuery, QuerySegmentWalker segmentWal
final JoinQuery.JoinDelegate joinQuery = (JoinQuery.JoinDelegate) subQuery;
List queries = joinQuery.getQueries();
List<String> aliases = joinQuery.getPrefixAliases();
Set<String> uniqueNames = Sets.newHashSet();
for (int i = 0; i < queries.size(); i++) {
final Schema schema = relaySchema((Query) queries.get(i), segmentWalker);
final String prefix = aliases == null ? "" : aliases.get(i) + ".";
for (Pair<String, ValueDesc> pair : schema.dimensionAndTypes()) {
dimensionNames.add(prefix + pair.lhs);
dimensionNames.add(uniqueName(prefix + pair.lhs, uniqueNames));
dimensionTypes.add(pair.rhs);
}
for (Pair<String, ValueDesc> pair : schema.metricAndTypes()) {
metricNames.add(prefix + pair.lhs);
metricNames.add(uniqueName(prefix + pair.lhs, uniqueNames));
metricTypes.add(pair.rhs);
}
}
} else {
// todo union-all (partitioned-join, etc.)
throw new UnsupportedOperationException("Cannot extract metric from query " + subQuery);
throw new UnsupportedOperationException("Cannot extract schema from query " + subQuery);
}
LOG.info("resolved schema : %s + %s", dimensionNames, metricNames);
return new Schema(dimensionNames, metricNames, GuavaUtils.concatish(dimensionTypes, metricTypes));
}

// keep the same convention with calcite (see SqlValidatorUtil.addFields)
public static List<String> uniqueNames(List<String> names1, List<String> names2)
{
Set<String> uniqueNames = Sets.newHashSet();
return uniqueNames(names2, uniqueNames, uniqueNames(names1, uniqueNames, Lists.<String>newArrayList()));
}

public static List<String> uniqueNames(List<String> names, Set<String> uniqueNames, List<String> appendTo)
{
for (String name : names) {
appendTo.add(uniqueName(name, uniqueNames));
}
return appendTo;
}

public static String uniqueName(String name, Set<String> uniqueNames)
{
// Ensure that name is unique from all previous field names
String nameBase = name;
for (int j = 0; !uniqueNames.add(name); j++) {
name = nameBase + j;
}
return name;
}

@SuppressWarnings("unchecked")
public static <I> Sequence<Row> convertToRow(Query<I> subQuery, Sequence<I> sequence)
{
Expand Down
7 changes: 5 additions & 2 deletions sql/src/main/java/io/druid/sql/calcite/rel/DruidJoinRel.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.common.guava.GuavaUtils;
import com.metamx.common.logger.Logger;
import io.druid.query.Druids;
import io.druid.query.JoinElement;
import io.druid.query.JoinType;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.sql.calcite.Utils;
Expand All @@ -51,6 +52,8 @@

public class DruidJoinRel extends DruidRel<DruidJoinRel>
{
private static final Logger LOG = new Logger(DruidJoinRel.class);

public static DruidJoinRel create(Join join, JoinInfo joinInfo, DruidRel left, DruidRel right)
{
return new DruidJoinRel(
Expand Down Expand Up @@ -120,7 +123,7 @@ public DruidQuery toDruidQuery(boolean finalizeAggregations)
final List<String> leftOrder = leftQuery.getOutputRowSignature().getRowOrder();
final List<String> rightOrder = rightQuery.getOutputRowSignature().getRowOrder();

final RowSignature outRowSignature = RowSignature.from(GuavaUtils.concat(leftOrder, rightOrder), rowType);
final RowSignature outRowSignature = RowSignature.from(Queries.uniqueNames(leftOrder, rightOrder), rowType);

final List<String> leftKeys = Lists.newArrayList();
for (int leftKey : leftExpressions) {
Expand Down
3 changes: 1 addition & 2 deletions sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,6 @@ public void testGroupBySingleColumnDescendingNoTopN() throws Exception
@Test
public void testSelfJoinWithFallback() throws Exception
{
// todo x.dim1, y.dim1, y.dim2 --> "abc", "def", "abc" ?
testQuery(
PLANNER_CONFIG_FALLBACK,
"SELECT x.dim1, y.dim1, y.dim2\n"
Expand Down Expand Up @@ -820,7 +819,7 @@ public void testSelfJoinWithFallback() throws Exception
.build()
),
ImmutableList.of(
new Object[]{"def", "def", "abc"}
new Object[]{"abc", "def", "abc"}
)
);
}
Expand Down

0 comments on commit 3f62753

Please sign in to comment.