Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow SegmentAnalyzer to read columns from StorageAdapter, allow SegmentMetadataQuery to query IncrementalIndexSegments on realtime node #1739

Merged
merged 1 commit into from
Sep 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import com.metamx.common.logger.Logger;
import com.metamx.common.StringUtils;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.segment.QueryableIndex;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class SegmentAnalyzer
Expand Down Expand Up @@ -61,7 +67,7 @@ public Map<String, ColumnAnalysis> analyze(QueryableIndex index)

final ColumnAnalysis analysis;
final ValueType type = capabilities.getType();
switch(type) {
switch (type) {
case LONG:
analysis = analyzeLongColumn(column);
break;
Expand All @@ -82,7 +88,55 @@ public Map<String, ColumnAnalysis> analyze(QueryableIndex index)
columns.put(columnName, analysis);
}

columns.put(Column.TIME_COLUMN_NAME, lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP));
columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP)
);

return columns;
}

public Map<String, ColumnAnalysis> analyze(StorageAdapter adapter)
{
Preconditions.checkNotNull(adapter, "Adapter cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
List<String> columnNames = getStorageAdapterColumnNames(adapter);

int numRows = adapter.getNumRows();
for (String columnName : columnNames) {
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnName);
final ColumnAnalysis analysis;

/**
* StorageAdapter doesn't provide a way to get column values, so size is
* not calculated for STRING and COMPLEX columns.
*/
ValueType capType = capabilities.getType();
switch (capType) {
case LONG:
analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, Longs.BYTES);
break;
case FLOAT:
analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT);
break;
case STRING:
analysis = new ColumnAnalysis(capType.name(), 0, adapter.getDimensionCardinality(columnName), null);
break;
case COMPLEX:
analysis = new ColumnAnalysis(capType.name(), 0, null, null);
break;
default:
log.warn("Unknown column type[%s].", capType);
analysis = ColumnAnalysis.error(String.format("unknown_type_%s", capType));
}

columns.put(columnName, analysis);
}

columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysisForAdapter(ValueType.LONG.name(), null, numRows, NUM_BYTES_IN_TIMESTAMP)
);

return columns;
}
Expand Down Expand Up @@ -154,4 +208,26 @@ public ColumnAnalysis analyzeComplexColumn(Column column)

return new ColumnAnalysis(typeName, size, null, null);
}

private List<String> getStorageAdapterColumnNames(StorageAdapter adapter)
{
Indexed<String> dims = adapter.getAvailableDimensions();
Iterable<String> metrics = adapter.getAvailableMetrics();
Iterable<String> columnNames = Iterables.concat(dims, metrics);
List<String> sortedColumnNames = Lists.newArrayList(columnNames);
Collections.sort(sortedColumnNames);
return sortedColumnNames;
}

private ColumnAnalysis lengthBasedAnalysisForAdapter(
String type, ColumnCapabilities capabilities,
int numRows, final int numBytes
)
{
if (capabilities != null && capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(type, numRows * numBytes, null, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
return arg1;
}

if (!query.isMerge()) {
throw new ISE("Merging when a merge isn't supposed to happen[%s], [%s]", arg1, arg2);
}

List<Interval> newIntervals = JodaUtils.condenseIntervals(
Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;

final QueryableIndex index = segment.asQueryableIndex();
final Map<String, ColumnAnalysis> analyzedColumns;
long totalSize = 0;
if (index == null) {
return Sequences.empty();
// IncrementalIndexSegments (used by in-memory hydrants in the realtime service) do not have a QueryableIndex
analyzedColumns = analyzer.analyze(segment.asStorageAdapter());
} else {
analyzedColumns = analyzer.analyze(index);
// Initialize with the size of the whitespace, 1 byte per
totalSize = analyzedColumns.size() * index.getNumRows();
}

final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(index);

// Initialize with the size of the whitespace, 1 byte per
long totalSize = analyzedColumns.size() * index.getNumRows();

Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
ColumnIncluderator includerator = query.getToInclude();
for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public int getDimensionCardinality(String dimension)
return column.getDictionaryEncoding().getCardinality();
}

@Override
public int getNumRows()
{
return index.getNumRows();
}

@Override
public DateTime getMinTime()
{
Expand Down Expand Up @@ -136,6 +142,12 @@ public Capabilities getCapabilities()
return Capabilities.builder().dimensionValuesSorted(true).build();
}

@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return index.getColumn(column).getCapabilities();
}

@Override
public DateTime getMaxIngestedEventTime()
{
Expand Down Expand Up @@ -275,7 +287,10 @@ public void reset()
}

@Override
public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn)
public DimensionSelector makeDimensionSelector(
final String dimension,
@Nullable final ExtractionFn extractionFn
)
{
final Column columnDesc = index.getColumn(dimension);
if (columnDesc == null) {
Expand All @@ -296,8 +311,7 @@ public DimensionSelector makeDimensionSelector(final String dimension, @Nullable

if (column == null) {
return NULL_DIMENSION_SELECTOR;
}
else if (columnDesc.getCapabilities().hasMultipleValues()) {
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
return new DimensionSelector()
{
@Override
Expand Down Expand Up @@ -325,7 +339,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
Expand Down Expand Up @@ -388,7 +404,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
Expand Down
3 changes: 3 additions & 0 deletions processing/src/main/java/io/druid/segment/StorageAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.druid.segment;

import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -42,5 +43,7 @@ public interface StorageAdapter extends CursorFactory
public DateTime getMinTime();
public DateTime getMaxTime();
public Capabilities getCapabilities();
public ColumnCapabilities getColumnCapabilities(String column);
public int getNumRows();
public DateTime getMaxIngestedEventTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.druid.segment.SingleScanTimeDimSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
Expand Down Expand Up @@ -102,7 +103,7 @@ public Iterable<String> getAvailableMetrics()
@Override
public int getDimensionCardinality(String dimension)
{
if(dimension.equals(Column.TIME_COLUMN_NAME)) {
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return Integer.MAX_VALUE;
}
IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
Expand All @@ -112,6 +113,12 @@ public int getDimensionCardinality(String dimension)
return dimDim.size();
}

@Override
public int getNumRows()
{
return index.size();
}

@Override
public DateTime getMinTime()
{
Expand All @@ -130,6 +137,12 @@ public Capabilities getCapabilities()
return Capabilities.builder().dimensionValuesSorted(false).build();
}

@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return index.getCapabilities(column);
}

@Override
public DateTime getMaxIngestedEventTime()
{
Expand Down Expand Up @@ -278,7 +291,10 @@ public void reset()
}

@Override
public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn)
public DimensionSelector makeDimensionSelector(
final String dimension,
@Nullable final ExtractionFn extractionFn
)
{
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
Expand Down Expand Up @@ -310,7 +326,7 @@ public IndexedInts getRow()
}
}
// check for null entry
if(vals.isEmpty() && dimValLookup.contains(null)){
if (vals.isEmpty() && dimValLookup.contains(null)) {
int id = dimValLookup.getId(null);
if (id < maxId) {
vals.add(id);
Expand Down Expand Up @@ -369,7 +385,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return dimValLookup.getId(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,38 @@
public class SegmentAnalyzerTest
{
@Test
public void testIncrementalDoesNotWork() throws Exception
public void testIncrementalWorks() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null)
);

Assert.assertEquals(0, results.size());
Assert.assertEquals(1, results.size());

final SegmentAnalysis analysis = results.get(0);
Assert.assertEquals(null, analysis.getId());

final Map<String, ColumnAnalysis> columns = analysis.getColumns();

Assert.assertEquals(
TestIndex.COLUMNS.length,
columns.size()
); // All columns including time and empty/null column

for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension);

Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
}

for (String metric : TestIndex.METRICS) {
final ColumnAnalysis columnAnalysis = columns.get(metric);

Assert.assertEquals(metric, ValueType.FLOAT.name(), columnAnalysis.getType());
Assert.assertTrue(metric, columnAnalysis.getSize() > 0);
Assert.assertNull(metric, columnAnalysis.getCardinality());
}
}

@Test
Expand All @@ -66,7 +91,10 @@ public void testMappedWorks() throws Exception
Assert.assertEquals("test_1", analysis.getId());

final Map<String, ColumnAnalysis> columns = analysis.getColumns();
Assert.assertEquals(TestIndex.COLUMNS.length -1, columns.size()); // All columns including time and excluding empty/null column
Assert.assertEquals(
TestIndex.COLUMNS.length - 1,
columns.size()
); // All columns including time and excluding empty/null column

for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension);
Expand Down Expand Up @@ -107,7 +135,7 @@ private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
final SegmentMetadataQuery query = new SegmentMetadataQuery(
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, false
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
return Sequences.toList(query.run(runner, context), Lists.<SegmentAnalysis>newArrayList());
}
}