Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve column order in DruidSchema, SegmentMetadataQuery. #12754

Merged
merged 4 commits into from
Jul 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@
import org.openjdk.jmh.infra.Blackhole;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -113,7 +112,7 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
protected Sequence<SegmentAnalysis> runSegmentMetadataQuery(Iterable<SegmentId> segments)
{
final int numColumns = 1000;
Map<String, ColumnAnalysis> columnToAnalysisMap = new HashMap<>();
LinkedHashMap<String, ColumnAnalysis> columnToAnalysisMap = new LinkedHashMap<>();
for (int i = 0; i < numColumns; ++i) {
columnToAnalysisMap.put(
"col" + i,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;

public class SegmentAnalyzer
{
Expand Down Expand Up @@ -98,7 +98,8 @@ public Map<String, ColumnAnalysis> analyze(Segment segment)
// get length and column names from storageAdapter
final int length = storageAdapter.getNumRows();

Map<String, ColumnAnalysis> columns = new TreeMap<>();
// Use LinkedHashMap to preserve column order.
final Map<String, ColumnAnalysis> columns = new LinkedHashMap<>();

final RowSignature rowSignature = storageAdapter.getRowSignature();
for (String columnName : rowSignature.getColumnNames()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,19 @@
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.timeline.LogicalSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BinaryOperator;

public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
Expand Down Expand Up @@ -108,7 +110,8 @@ public Sequence<SegmentAnalysis> doRun(
ResponseContext context
)
{
SegmentMetadataQuery updatedQuery = ((SegmentMetadataQuery) queryPlus.getQuery()).withFinalizedAnalysisTypes(config);
SegmentMetadataQuery updatedQuery = ((SegmentMetadataQuery) queryPlus.getQuery()).withFinalizedAnalysisTypes(
config);
QueryPlus<SegmentAnalysis> updatedQueryPlus = queryPlus.withQuery(updatedQuery);
return new MappedSequence<>(
CombiningSequence.create(
Expand All @@ -135,7 +138,12 @@ private BinaryOperator<SegmentAnalysis> createMergeFn(final SegmentMetadataQuery
@Override
public BinaryOperator<SegmentAnalysis> createMergeFn(Query<SegmentAnalysis> query)
{
return (arg1, arg2) -> mergeAnalyses(arg1, arg2, ((SegmentMetadataQuery) query).isLenientAggregatorMerge());
return (arg1, arg2) -> mergeAnalyses(
Iterables.getFirst(query.getDataSource().getTableNames(), null),
arg1,
arg2,
((SegmentMetadataQuery) query).isLenientAggregatorMerge()
);
}

@Override
Expand Down Expand Up @@ -246,8 +254,9 @@ public <T extends LogicalSegment> List<T> filterSegments(SegmentMetadataQuery qu

@VisibleForTesting
public static SegmentAnalysis mergeAnalyses(
final SegmentAnalysis arg1,
final SegmentAnalysis arg2,
@Nullable String dataSource,
SegmentAnalysis arg1,
SegmentAnalysis arg2,
boolean lenientAggregatorMerge
)
{
Expand All @@ -259,6 +268,19 @@ public static SegmentAnalysis mergeAnalyses(
return arg1;
}

// Swap arg1, arg2 so the later-ending interval is first. This ensures we prefer the latest column order.
// We're preserving it so callers can see columns in their natural order.
if (dataSource != null) {
final SegmentId id1 = SegmentId.tryParse(dataSource, arg1.getId());
final SegmentId id2 = SegmentId.tryParse(dataSource, arg2.getId());

if (id1 != null && id2 != null && id2.getIntervalEnd().isAfter(id1.getIntervalEnd())) {
final SegmentAnalysis tmp = arg1;
arg1 = arg2;
arg2 = tmp;
}
}

List<Interval> newIntervals = null;
if (arg1.getIntervals() != null) {
newIntervals = new ArrayList<>(arg1.getIntervals());
Expand All @@ -272,7 +294,7 @@ public static SegmentAnalysis mergeAnalyses(

final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
Map<String, ColumnAnalysis> columns = new TreeMap<>();
final LinkedHashMap<String, ColumnAnalysis> columns = new LinkedHashMap<>();

Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -98,7 +98,7 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, ResponseCon
totalSize = analyzedColumns.size() * numRows;
}

Map<String, ColumnAnalysis> columns = new TreeMap<>();
LinkedHashMap<String, ColumnAnalysis> columns = new LinkedHashMap<>();
ColumnIncluderator includerator = updatedQuery.getToInclude();
for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
final String columnName = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -42,7 +43,12 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
*/
private final String id;
private final List<Interval> interval;
private final Map<String, ColumnAnalysis> columns;

/**
* Require LinkedHashMap to emphasize how important column order is. It's used by DruidSchema to keep
* SQL column order in line with ingestion column order.
*/
private final LinkedHashMap<String, ColumnAnalysis> columns;
private final long size;
private final long numRows;
private final Map<String, AggregatorFactory> aggregators;
Expand All @@ -54,7 +60,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
public SegmentAnalysis(
@JsonProperty("id") String id,
@JsonProperty("intervals") List<Interval> interval,
@JsonProperty("columns") Map<String, ColumnAnalysis> columns,
@JsonProperty("columns") LinkedHashMap<String, ColumnAnalysis> columns,
@JsonProperty("size") long size,
@JsonProperty("numRows") long numRows,
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators,
Expand Down Expand Up @@ -87,7 +93,7 @@ public List<Interval> getIntervals()
}

@JsonProperty
public Map<String, ColumnAnalysis> getColumns()
public LinkedHashMap<String, ColumnAnalysis> getColumns()
{
return columns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -442,7 +443,7 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen
{
MMappedIndex index = legacyHandler.mapDir(inDir);

Map<String, Supplier<ColumnHolder>> columns = new HashMap<>();
Map<String, Supplier<ColumnHolder>> columns = new LinkedHashMap<>();

for (String dimension : index.getAvailableDimensions()) {
ColumnBuilder builder = new ColumnBuilder()
Expand Down Expand Up @@ -624,7 +625,7 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen
}
}

Map<String, Supplier<ColumnHolder>> columns = new HashMap<>();
Map<String, Supplier<ColumnHolder>> columns = new LinkedHashMap<>();

// Register the time column
ByteBuffer timeBuffer = smooshedFiles.mapFile("__time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.segment;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand All @@ -44,8 +43,9 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;

/**
*
Expand Down Expand Up @@ -82,8 +82,14 @@ public Indexed<String> getAvailableDimensions()
@Override
public Iterable<String> getAvailableMetrics()
{
HashSet<String> columnNames = Sets.newHashSet(index.getColumnNames());
return Sets.difference(columnNames, Sets.newHashSet(index.getAvailableDimensions()));
// Use LinkedHashSet to preserve the original order.
final Set<String> columnNames = new LinkedHashSet<>(index.getColumnNames());

for (final String dimension : index.getAvailableDimensions()) {
columnNames.remove(dimension);
}

return columnNames;
}

@Override
Expand Down
Loading