Skip to content

Commit

Permalink
Add a new flag for ingestion to preserve existing metrics (#12185)
Browse files Browse the repository at this point in the history
* add impl

* add impl

* fix checkstyle

* add impl

* add unit test

* fix stuff

* fix stuff

* fix stuff

* add unit test

* add more unit tests

* add more unit tests

* add IT

* add IT

* add IT

* add IT

* add ITs

* address comments

* fix test

* fix test

* fix test

* address comments

* address comments

* address comments

* fix conflict

* fix checkstyle

* address comments

* fix test

* fix checkstyle

* fix test

* fix test

* fix IT
  • Loading branch information
maytasm authored Apr 8, 2022
1 parent bf96ddf commit 8edea5a
Show file tree
Hide file tree
Showing 26 changed files with 1,358 additions and 79 deletions.
27 changes: 27 additions & 0 deletions core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.druid.data.input;

import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;

import javax.validation.constraints.NotNull;
import java.util.Set;

/**
* Schema of {@link InputRow}.
*/
Expand All @@ -30,16 +34,39 @@ public class InputRowSchema
private final TimestampSpec timestampSpec;
private final DimensionsSpec dimensionsSpec;
private final ColumnsFilter columnsFilter;
/**
* Set of metric names for further downstream processing by {@link InputSource}.
* Empty set if no metric given.
*/
@NotNull
private final Set<String> metricNames;

public InputRowSchema(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final ColumnsFilter columnsFilter
)
{
this(timestampSpec, dimensionsSpec, columnsFilter, ImmutableSet.of());
}

public InputRowSchema(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final ColumnsFilter columnsFilter,
final Set<String> metricNames
)
{
this.timestampSpec = timestampSpec;
this.dimensionsSpec = dimensionsSpec;
this.columnsFilter = columnsFilter;
this.metricNames = metricNames == null ? ImmutableSet.of() : metricNames;
}

@NotNull
public Set<String> getMetricNames()
{
return metricNames;
}

public TimestampSpec getTimestampSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
Expand Down Expand Up @@ -241,8 +243,26 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu

final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);

return new InputEntityIteratingReader(
getInputRowSchemaToUse(inputRowSchema),
inputFormat,
entityIterator,
temporaryDirectory
);
}

@VisibleForTesting
InputRowSchema getInputRowSchemaToUse(InputRowSchema inputRowSchema)
{
final InputRowSchema inputRowSchemaToUse;

ColumnsFilter columnsFilterToUse = inputRowSchema.getColumnsFilter();
if (inputRowSchema.getMetricNames() != null) {
for (String metricName : inputRowSchema.getMetricNames()) {
columnsFilterToUse = columnsFilterToUse.plus(metricName);
}
}

if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) {
// Legacy compatibility mode; see https://github.com/apache/druid/pull/10267.
LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with "
Expand All @@ -251,10 +271,14 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu
inputRowSchemaToUse = new InputRowSchema(
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null),
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME)
columnsFilterToUse.plus(ColumnHolder.TIME_COLUMN_NAME)
);
} else {
inputRowSchemaToUse = inputRowSchema;
inputRowSchemaToUse = new InputRowSchema(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
columnsFilterToUse
);
}

if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn())
Expand All @@ -268,12 +292,7 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu
);
}

return new InputEntityIteratingReader(
inputRowSchemaToUse,
inputFormat,
entityIterator,
temporaryDirectory
);
return inputRowSchemaToUse;
}

private List<TimelineObjectHolder<String, DataSegment>> createTimeline()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.transform.Transform;
import org.apache.druid.segment.transform.TransformSpec;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -56,7 +57,10 @@ public static InputRowSchema fromDataSchema(final DataSchema dataSchema)
dataSchema.getDimensionsSpec(),
dataSchema.getTransformSpec(),
dataSchema.getAggregators()
)
),
Arrays.stream(dataSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(Collectors.toSet())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -93,6 +94,7 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
true
),
new ClientCompactionTaskQueryTuningConfig(
null,
null,
40000,
2000L,
Expand Down Expand Up @@ -249,7 +251,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
new ParallelIndexTuningConfig(
null,
null,
null,
new OnheapIncrementalIndex.Spec(true),
40000,
2000L,
null,
Expand Down Expand Up @@ -313,6 +315,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
),
new ClientCompactionTaskQueryTuningConfig(
100,
new OnheapIncrementalIndex.Spec(true),
40000,
2000L,
30000L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
Expand All @@ -35,12 +40,15 @@
import org.apache.druid.segment.TestHelper;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Arrays;

public class DruidInputSourceTest
{
private final IndexIO indexIO = EasyMock.createMock(IndexIO.class);
Expand Down Expand Up @@ -221,4 +229,74 @@ public void testSerdeUsingNoDataSource() throws Exception

mapper.readValue(json, InputSource.class);
}

@Test
public void testReaderColumnsFilterWithMetricGiven()
{
String datasource = "foo";
Interval interval = Intervals.of("2000/2001");
String column = "c1";
String metricName = "m1";
ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column));
InputRowSchema inputRowSchema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b"))
),
originalColumnsFilter,
ImmutableSet.of(metricName)
);
DruidInputSource druidInputSource = new DruidInputSource(
datasource,
interval,
null,
null,
ImmutableList.of("a"),
ImmutableList.of("b"),
indexIO,
coordinatorClient,
segmentCacheManagerFactory,
retryPolicyFactory,
taskConfig
);
InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema);
ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter();
Assert.assertTrue(columnsFilter.apply(column));
Assert.assertTrue(columnsFilter.apply(metricName));
}

@Test
public void testReaderColumnsFilterWithNoMetricGiven()
{
String datasource = "foo";
Interval interval = Intervals.of("2000/2001");
String column = "c1";
String metricName = "m1";
ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column));
InputRowSchema inputRowSchema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b"))
),
originalColumnsFilter,
ImmutableSet.of()
);
DruidInputSource druidInputSource = new DruidInputSource(
datasource,
interval,
null,
null,
ImmutableList.of("a"),
ImmutableList.of("b"),
indexIO,
coordinatorClient,
segmentCacheManagerFactory,
retryPolicyFactory,
taskConfig
);
InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema);
ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter();
Assert.assertTrue(columnsFilter.apply(column));
Assert.assertFalse(columnsFilter.apply(metricName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,28 @@
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;

public class InputRowSchemasTest extends NullHandlingTest
{
@Test
Expand Down Expand Up @@ -98,4 +108,65 @@ public void test_createColumnsFilter_schemaless()
columnsFilter
);
}

@Test
public void testFromDataSchema()
{
TimestampSpec timestampSpec = new TimestampSpec(null, null, null);
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("d1"),
new StringDimensionSchema("d2"),
new LongDimensionSchema("d3"),
new FloatDimensionSchema("d4"),
new DoubleDimensionSchema("d5")
)
);
DataSchema schema = new DataSchema(
"dataSourceName",
new TimestampSpec(null, null, null),
dimensionsSpec,
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("met", "met")
},
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
null
);

InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema);
Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec());
Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions());
Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames());
Assert.assertEquals(ImmutableSet.of("count", "met"), inputRowSchema.getMetricNames());
}

@Test
public void testFromDataSchemaWithNoAggregator()
{
TimestampSpec timestampSpec = new TimestampSpec(null, null, null);
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("d1"),
new StringDimensionSchema("d2"),
new LongDimensionSchema("d3"),
new FloatDimensionSchema("d4"),
new DoubleDimensionSchema("d5")
)
);
DataSchema schema = new DataSchema(
"dataSourceName",
new TimestampSpec(null, null, null),
dimensionsSpec,
new AggregatorFactory[]{},
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
null
);

InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema);
Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec());
Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions());
Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames());
Assert.assertEquals(ImmutableSet.of(), inputRowSchema.getMetricNames());
}
}
5 changes: 5 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@
<version>${aws.sdk.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static DataSourceCompactionConfig createCompactionConfig(
null,
null,
null,
null,
new MaxSizeSplitHintSpec(null, 1),
new DynamicPartitionsSpec(maxRowsPerSegment, null),
null,
Expand Down
Loading

0 comments on commit 8edea5a

Please sign in to comment.