diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index b93053e63fe8..4af61c6219ed 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -101,6 +101,7 @@ public void setup() null, null, null, + null, null ) ); diff --git a/core/src/main/java/org/apache/druid/timeline/CompactionState.java b/core/src/main/java/org/apache/druid/timeline/CompactionState.java index 85887179e2c1..fff82349e888 100644 --- a/core/src/main/java/org/apache/druid/timeline/CompactionState.java +++ b/core/src/main/java/org/apache/druid/timeline/CompactionState.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import java.util.Map; @@ -40,6 +41,7 @@ public class CompactionState { private final PartitionsSpec partitionsSpec; + private final DimensionsSpec dimensionsSpec; // org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which // has a dependency on the 'core' module where this class is. private final Map indexSpec; @@ -50,11 +52,13 @@ public class CompactionState @JsonCreator public CompactionState( @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, + @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("indexSpec") Map indexSpec, @JsonProperty("granularitySpec") Map granularitySpec ) { this.partitionsSpec = partitionsSpec; + this.dimensionsSpec = dimensionsSpec; this.indexSpec = indexSpec; this.granularitySpec = granularitySpec; } @@ -65,6 +69,12 @@ public PartitionsSpec getPartitionsSpec() return partitionsSpec; } + @JsonProperty + public DimensionsSpec getDimensionsSpec() + { + return dimensionsSpec; + } + @JsonProperty public Map getIndexSpec() { @@ -88,6 +98,7 @@ public boolean equals(Object o) } CompactionState that = (CompactionState) o; return Objects.equals(partitionsSpec, that.partitionsSpec) && + Objects.equals(dimensionsSpec, that.dimensionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(granularitySpec, that.granularitySpec); } @@ -95,7 +106,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(partitionsSpec, indexSpec, granularitySpec); + return Objects.hash(partitionsSpec, dimensionsSpec, indexSpec, granularitySpec); } @Override @@ -103,6 +114,7 @@ public String toString() { return "CompactionState{" + "partitionsSpec=" + partitionsSpec + + ", dimensionsSpec=" + dimensionsSpec + ", indexSpec=" + indexSpec + ", granularitySpec=" + granularitySpec + '}'; diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java index 66c3400cfd29..0ed35da63f5d 100644 --- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java +++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.RangeSet; import org.apache.druid.TestObjectMapper; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.java.util.common.DateTimes; @@ -120,6 +121,7 @@ public void testV1Serialization() throws Exception new NumberedShardSpec(3, 0), new CompactionState( new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "bar", "foo")), null, null), ImmutableMap.of(), ImmutableMap.of() ), @@ -142,6 +144,7 @@ public void testV1Serialization() throws Exception Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec")); Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); Assert.assertEquals(1, objectMap.get("size")); + Assert.assertEquals(4, ((Map) objectMap.get("lastCompactionState")).size()); DataSegment deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class); @@ -154,6 +157,7 @@ public void testV1Serialization() throws Exception Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec()); Assert.assertEquals(segment.getSize(), deserializedSegment.getSize()); Assert.assertEquals(segment.getId(), deserializedSegment.getId()); + Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState()); deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class); Assert.assertEquals(0, segment.compareTo(deserializedSegment)); @@ -165,6 +169,100 @@ public void testV1Serialization() throws Exception Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode()); } + @Test + public void testDeserializationDataSegmentLastCompactionStateWithoutDimensionsSpec() throws Exception + { + final Interval interval = Intervals.of("2011-10-01/2011-10-02"); + final ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + DataSegment segment = new DataSegment( + "something", + interval, + "1", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + new NumberedShardSpec(3, 0), + new CompactionState( + new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")), + null, + ImmutableMap.of(), + ImmutableMap.of() + ), + TEST_VERSION, + 1 + ); + String lastCompactionStateWithoutDimensionsSpec = "{" + + "\"dataSource\": \"something\"," + + "\"interval\": \"2011-10-01T00:00:00.000Z/2011-10-02T00:00:00.000Z\"," + + "\"version\": \"1\"," + + "\"loadSpec\": {" + + " \"something\": \"or_other\"" + + "}," + + "\"dimensions\": \"dim1,dim2\"," + + "\"metrics\": \"met1,met2\"," + + "\"shardSpec\": {" + + " \"type\": \"numbered\"," + + " \"partitionNum\": 3," + + " \"partitions\": 0" + + "}," + + "\"lastCompactionState\": {" + + " \"partitionsSpec\": {" + + " \"type\": \"hashed\"," + + " \"numShards\": null," + + " \"partitionDimensions\": [\"dim1\"]," + + " \"partitionFunction\": \"murmur3_32_abs\"," + + " \"maxRowsPerSegment\": 100000" + + " }," + + " \"indexSpec\": {}," + + " \"granularitySpec\": {}" + + "}," + + "\"binaryVersion\": 9," + + "\"size\": 1," + + "\"identifier\": \"something_2011-10-01T00:00:00.000Z_2011-10-02T00:00:00.000Z_1_3\"" + + "}"; + + final Map objectMap = MAPPER.readValue( + lastCompactionStateWithoutDimensionsSpec, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + Assert.assertEquals(11, objectMap.size()); + Assert.assertEquals("something", objectMap.get("dataSource")); + Assert.assertEquals(interval.toString(), objectMap.get("interval")); + Assert.assertEquals("1", objectMap.get("version")); + Assert.assertEquals(loadSpec, objectMap.get("loadSpec")); + Assert.assertEquals("dim1,dim2", objectMap.get("dimensions")); + Assert.assertEquals("met1,met2", objectMap.get("metrics")); + Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec")); + Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); + Assert.assertEquals(1, objectMap.get("size")); + Assert.assertEquals(3, ((Map) objectMap.get("lastCompactionState")).size()); + + DataSegment deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class); + Assert.assertEquals(segment.getDataSource(), deserializedSegment.getDataSource()); + Assert.assertEquals(segment.getInterval(), deserializedSegment.getInterval()); + Assert.assertEquals(segment.getVersion(), deserializedSegment.getVersion()); + Assert.assertEquals(segment.getLoadSpec(), deserializedSegment.getLoadSpec()); + Assert.assertEquals(segment.getDimensions(), deserializedSegment.getDimensions()); + Assert.assertEquals(segment.getMetrics(), deserializedSegment.getMetrics()); + Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec()); + Assert.assertEquals(segment.getSize(), deserializedSegment.getSize()); + Assert.assertEquals(segment.getId(), deserializedSegment.getId()); + Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState()); + Assert.assertNotNull(segment.getLastCompactionState()); + Assert.assertNull(segment.getLastCompactionState().getDimensionsSpec()); + Assert.assertNotNull(deserializedSegment.getLastCompactionState()); + Assert.assertNull(deserializedSegment.getLastCompactionState().getDimensionsSpec()); + + deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class); + Assert.assertEquals(0, segment.compareTo(deserializedSegment)); + + deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class); + Assert.assertEquals(0, deserializedSegment.compareTo(segment)); + + deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class); + Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode()); + } + @Test public void testIdentifier() { @@ -232,6 +330,7 @@ public void testWithLastCompactionState() { final CompactionState compactionState = new CompactionState( new DynamicPartitionsSpec(null, null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), Collections.singletonMap("test", "map"), Collections.singletonMap("test2", "map2") ); diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 1eba2dc898df..e1a49b3a3326 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -945,7 +945,8 @@ A description of the compaction config is: |`skipOffsetFromLatest`|The offset for searching segments to be compacted in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Strongly recommended to set for realtime dataSources. See [Data handling with compaction](../ingestion/compaction.md#data-handling-with-compaction)|no (default = "P1D")| |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no| |`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no| -|`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No| +|`granularitySpec`|Custom `granularitySpec`. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No| +|`dimensionsSpec`|Custom `dimensionsSpec`. See [Automatic compaction dimensionsSpec](#automatic-compaction-dimensions-spec)|No| |`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no| An example of compaction config is: @@ -988,9 +989,6 @@ The below is a list of the supported configurations for auto compaction. |`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|no (default = 5)| ###### Automatic compaction granularitySpec -You can optionally use the `granularitySpec` object to configure the segment granularity of the compacted segments. - -`granularitySpec` takes the following keys: |Field|Description|Required| |-----|-----------|--------| @@ -998,6 +996,12 @@ You can optionally use the `granularitySpec` object to configure the segment gra |`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No| |`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No| +###### Automatic compaction dimensions spec + +|Field|Description|Required| +|-----|-----------|--------| +|`dimensions`| A list of dimension names or objects. Defaults to 'null', which preserves the original dimensions. Note that setting this will cause segments manually compacted with `dimensionExclusions` to be compacted again.|No| + ###### Automatic compaction IOConfig Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md). diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index fa63a63f42ee..3762fd590369 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -109,7 +109,7 @@ To perform a manual compaction, you submit a compaction task. Compaction tasks m |`id`|Task id|No| |`dataSource`|Data source name to compact|Yes| |`ioConfig`|I/O configuration for compaction task. See [Compaction I/O configuration](#compaction-io-configuration) for details.|Yes| -|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one.|No| +|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one. See [Compaction dimensionsSpec](#compaction-dimensions-spec) for details.|No| |`metricsSpec`|Custom metrics spec. The compaction task uses the specified metrics spec rather than generating one.|No| |`segmentGranularity`|When set, the compaction task changes the segment granularity for the given interval. Deprecated. Use `granularitySpec`. |No.| |`tuningConfig`|[Parallel indexing task tuningConfig](native-batch.md#tuningconfig). `awaitSegmentAvailabilityTimeoutMillis` in the tuning config is not currently supported for compaction tasks. Do not set it to a non-zero value.|No| @@ -181,6 +181,17 @@ Druid supports two supported `inputSpec` formats: |`type`|Task type. Should be `segments`|Yes| |`segments`|A list of segment IDs|Yes| + +### Compaction dimensions spec +You can optionally use the `dimensionsSpec` object to configure the dimensions of the compacted segments. + +`dimensionsSpec` takes the following keys: + +|Field|Description|Required| +|-----|-----------|--------| +|`dimensions`| A list of dimension names or objects. Cannot have the same column in both `dimensions` and `dimensionExclusions`. Defaults to `null`, which preserves the original dimensions.|No| +|`dimensionExclusions`| The names of dimensions to exclude from compaction. Only names are supported here, not objects. This list is only used if the dimensions list is null or empty; otherwise it is ignored. Defaults to `[]`.|No| + ### Compaction granularity spec You can optionally use the `granularitySpec` object to configure the segment granularity and the query granularity of the compacted segments. Their syntax is as follows: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index a2892210dae9..7faaa096c4bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; @@ -56,6 +57,8 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.IngestionSpec; +import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -479,14 +482,22 @@ public static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConf public static Function, Set> compactionStateAnnotateFunction( boolean storeCompactionState, TaskToolbox toolbox, - IndexTuningConfig tuningConfig, - GranularitySpec granularitySpec + IngestionSpec ingestionSpec ) { if (storeCompactionState) { - final Map indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()); - final Map granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper()); - final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap); + TuningConfig tuningConfig = ingestionSpec.getTuningConfig(); + GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec(); + // We do not need to store dimensionExclusions and spatialDimensions since auto compaction does not support them + DimensionsSpec dimensionsSpec = ingestionSpec.getDataSchema().getDimensionsSpec() == null + ? null + : new DimensionsSpec(ingestionSpec.getDataSchema().getDimensionsSpec().getDimensions(), null, null); + final CompactionState compactionState = new CompactionState( + tuningConfig.getPartitionsSpec(), + dimensionsSpec, + tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()), + granularitySpec.asMap(toolbox.getJsonMapper()) + ); return segments -> segments .stream() .map(s -> s.withLastCompactionState(compactionState)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index a993b539076d..ba59ba14c813 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -925,8 +925,7 @@ private TaskStatus generateAndPublishSegments( compactionStateAnnotateFunction( storeCompactionState, toolbox, - ingestionSchema.getTuningConfig(), - ingestionSchema.getDataSchema().getGranularitySpec() + ingestionSchema ); // Probably we can publish atomicUpdateGroup along with segments. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index ec5a5df9fa83..c185d71d6294 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1051,8 +1051,7 @@ private void publishSegments( final Function, Set> annotateFunction = compactionStateAnnotateFunction( storeCompactionState, toolbox, - ingestionSchema.getTuningConfig(), - ingestionSchema.getDataSchema().getGranularitySpec() + ingestionSchema ); Set segmentsFoundForDrop = null; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 8d78d320949b..c1a3363840fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -27,6 +27,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; +import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; @@ -34,6 +35,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; @@ -117,6 +119,7 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException 100 ), new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), + new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), ImmutableMap.of("key", "value") ); @@ -211,6 +214,10 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException task.getIoConfig().isDropExisting() ); Assert.assertEquals(query.getContext(), task.getContext()); + Assert.assertEquals( + query.getDimensionsSpec().getDimensions(), + task.getDimensionsSpec().getDimensions() + ); } @Test @@ -269,6 +276,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException ) ) .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true)) + .dimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), ImmutableList.of("__time", "val"), null)) .build(); final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( @@ -312,6 +320,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException 100 ), new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), + new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), new HashMap<>() ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index e9ec43458777..c4f96c292942 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -158,6 +158,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws // Expect compaction state to exist as store compaction state by default CompactionState expectedState = new CompactionState( new DynamicPartitionsSpec(null, Long.MAX_VALUE), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), getObjectMapper().readValue( getObjectMapper().writeValueAsString( @@ -198,6 +199,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new HashedPartitionsSpec(null, 3, null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), getObjectMapper().readValue( getObjectMapper().writeValueAsString( @@ -238,6 +240,7 @@ public void testRunParallelWithRangePartitioning() throws Exception Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new SingleDimensionPartitionsSpec(7, null, "dim", false), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), getObjectMapper().readValue( getObjectMapper().writeValueAsString( @@ -281,6 +284,7 @@ public void testRunParallelWithMultiDimensionRangePartitioning() throws Exceptio Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), getObjectMapper().readValue( getObjectMapper().writeValueAsString( @@ -321,6 +325,7 @@ public void testRunParallelWithRangePartitioningWithSingleTask() throws Exceptio Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new SingleDimensionPartitionsSpec(7, null, "dim", false), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), getObjectMapper().readValue( getObjectMapper().writeValueAsString( @@ -364,6 +369,7 @@ public void testRunParallelWithMultiDimensionRangePartitioningWithSingleTask() t Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), getObjectMapper().readValue( getObjectMapper().writeValueAsString( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index eebd87592e22..b1e3c0370e99 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -194,6 +194,7 @@ public static CompactionState getDefaultCompactionState(Granularity segmentGranu // Expected compaction state to exist after compaction as we store compaction state by default return new CompactionState( new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class), mapper.readValue( mapper.writeValueAsString( @@ -338,6 +339,7 @@ public void testRunWithHashPartitioning() throws Exception ); CompactionState expectedState = new CompactionState( new HashedPartitionsSpec(null, 3, null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null), compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), getObjectMapper().readValue( getObjectMapper().writeValueAsString( diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java index 5ccd5f412396..6dcf7904216f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java @@ -69,6 +69,7 @@ public static DataSourceCompactionConfig createCompactionConfig( 1 ), null, + null, new UserCompactionTaskIOConfig(true), null ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 157b8d5fdc4e..c7e5634b23a1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -38,6 +39,7 @@ import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; @@ -76,6 +78,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest private static final Logger LOG = new Logger(ITAutoCompactionTest.class); private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json"; + private static final String INDEX_TASK_WITH_DIMENSION_SPEC = "/indexer/wikipedia_index_task_with_dimension_spec.json"; private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000; @@ -175,7 +178,7 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception LOG.info("Auto compaction test with hash partitioning"); final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); - submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, false); + submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, false); // 2 segments published per day after compaction. forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -190,7 +193,7 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception "city", false ); - submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, false); + submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, false); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(rangePartitionsSpec, 2); @@ -679,6 +682,51 @@ public void testAutoCompactionDutyWithQueryGranularity() throws Exception } } + @Test + public void testAutoCompactionDutyWithDimensionsSpec() throws Exception + { + // Index data with dimensions "page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", + // "namespace", "continent", "country", "region", "city" + loadData(INDEX_TASK_WITH_DIMENSION_SPEC); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + + // Result is not rollup + Map expectedResult = ImmutableMap.of( + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + + // Compact and change dimension to only "language" + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + null, + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), + false + ); + forceTriggerAutoCompaction(2); + + // Result should rollup on language dimension + expectedResult = ImmutableMap.of( + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); + + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + // Verify compacted segments does not get compacted again + forceTriggerAutoCompaction(2); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + private void loadData(String indexTask) throws Exception { loadData(indexTask, ImmutableMap.of()); @@ -752,7 +800,12 @@ private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffset private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, boolean dropExisting) throws Exception { - submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dropExisting); + submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, null, dropExisting); + } + + private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, UserCompactionTaskDimensionsConfig dimensionsSpec, boolean dropExisting) throws Exception + { + submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dimensionsSpec, dropExisting); } private void submitCompactionConfig( @@ -760,6 +813,7 @@ private void submitCompactionConfig( Period skipOffsetFromLatest, int maxNumConcurrentSubTasks, UserCompactionTaskGranularityConfig granularitySpec, + UserCompactionTaskDimensionsConfig dimensionsSpec, boolean dropExisting ) throws Exception { @@ -789,6 +843,7 @@ private void submitCompactionConfig( 1 ), granularitySpec, + dimensionsSpec, !dropExisting ? null : new UserCompactionTaskIOConfig(true), null ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java index a4b1ed609cb4..0ca16cffef64 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java @@ -97,6 +97,7 @@ public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVers 1 ), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), + null, new UserCompactionTaskIOConfig(true), null ); diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json new file mode 100644 index 000000000000..1fa8b4eba321 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json @@ -0,0 +1,86 @@ +{ + "type": "index", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "DAY", + "intervals" : [ "2013-08-31/2013-09-02" ] + }, + "parser": { + "parseSpec": { + "format" : "json", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + {"type": "string", "name": "language", "createBitmapIndex": false}, + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + } + } + }, + "ioConfig": { + "type": "index", + "firehose": { + "type": "local", + "baseDir": "/resources/data/batch_index/json", + "filter": "wikipedia_index_data*" + } + }, + "tuningConfig": { + "type": "index", + "maxRowsPerSegment": 3, + "awaitSegmentAvailabilityTimeoutMillis": %%SEGMENT_AVAIL_TIMEOUT_MILLIS%% + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java new file mode 100644 index 000000000000..4937bcea961a --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.java.util.common.parsers.ParserUtils; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Spec containing dimension configs for Compaction Task. + * This class mimics JSON field names for fields supported in compaction task with + * the corresponding fields in {@link org.apache.druid.data.input.impl.DimensionsSpec}. + * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set + * dimension configs for Compaction task as they would for any other ingestion task. + */ +public class ClientCompactionTaskDimensionsSpec +{ + @Nullable private final List dimensions; + + @JsonCreator + public ClientCompactionTaskDimensionsSpec( + @Nullable @JsonProperty("dimensions") List dimensions + ) + { + if (dimensions != null) { + List dimensionNames = dimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList()); + ParserUtils.validateFields(dimensionNames); + } + this.dimensions = dimensions; + } + + @Nullable + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionTaskDimensionsSpec that = (ClientCompactionTaskDimensionsSpec) o; + return Objects.equals(dimensions, that.dimensions); + } + + @Override + public int hashCode() + { + return Objects.hash(dimensions); + } + + @Override + public String toString() + { + return "ClientCompactionTaskDimensionsSpec{" + + "dimensions=" + dimensions + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java index 62e39d62610f..89e5049bac98 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java @@ -39,6 +39,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery private final ClientCompactionIOConfig ioConfig; private final ClientCompactionTaskQueryTuningConfig tuningConfig; private final ClientCompactionTaskGranularitySpec granularitySpec; + private final ClientCompactionTaskDimensionsSpec dimensionsSpec; private final Map context; @JsonCreator @@ -48,6 +49,7 @@ public ClientCompactionTaskQuery( @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig, @JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig, @JsonProperty("granularitySpec") ClientCompactionTaskGranularitySpec granularitySpec, + @JsonProperty("dimensionsSpec") ClientCompactionTaskDimensionsSpec dimensionsSpec, @JsonProperty("context") Map context ) { @@ -56,6 +58,7 @@ public ClientCompactionTaskQuery( this.ioConfig = ioConfig; this.tuningConfig = tuningConfig; this.granularitySpec = granularitySpec; + this.dimensionsSpec = dimensionsSpec; this.context = context; } @@ -98,13 +101,18 @@ public ClientCompactionTaskGranularitySpec getGranularitySpec() return granularitySpec; } + @JsonProperty + public ClientCompactionTaskDimensionsSpec getDimensionsSpec() + { + return dimensionsSpec; + } + @JsonProperty public Map getContext() { return context; } - @Override public boolean equals(Object o) { @@ -120,13 +128,14 @@ public boolean equals(Object o) Objects.equals(ioConfig, that.ioConfig) && Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(granularitySpec, that.granularitySpec) && + Objects.equals(dimensionsSpec, that.dimensionsSpec) && Objects.equals(context, that.context); } @Override public int hashCode() { - return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, context); + return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, dimensionsSpec, context); } @Override @@ -138,6 +147,7 @@ public String toString() ", ioConfig=" + ioConfig + ", tuningConfig=" + tuningConfig + ", granularitySpec=" + granularitySpec + + ", dimensionsSpec=" + dimensionsSpec + ", context=" + context + '}'; } diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index afab3b536544..c2f80a0ee636 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -81,6 +81,7 @@ public String compactSegments( int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec, @Nullable Boolean dropExisting, @Nullable Map context ) @@ -103,6 +104,7 @@ public String compactSegments( new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting), tuningConfig, granularitySpec, + dimensionsSpec, context ); return runTask(taskId, taskQuery); diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index 84f9f554681b..d24998988976 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -41,6 +41,7 @@ String compactSegments( int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec, @Nullable Boolean dropExisting, @Nullable Map context ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index dfe1f5775de2..33d804306185 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -47,6 +47,7 @@ public class DataSourceCompactionConfig private final Period skipOffsetFromLatest; private final UserCompactionTaskQueryTuningConfig tuningConfig; private final UserCompactionTaskGranularityConfig granularitySpec; + private final UserCompactionTaskDimensionsConfig dimensionsSpec; private final UserCompactionTaskIOConfig ioConfig; private final Map taskContext; @@ -59,6 +60,7 @@ public DataSourceCompactionConfig( @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig, @JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec, + @JsonProperty("dimensionsSpec") @Nullable UserCompactionTaskDimensionsConfig dimensionsSpec, @JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig, @JsonProperty("taskContext") @Nullable Map taskContext ) @@ -75,6 +77,7 @@ public DataSourceCompactionConfig( this.tuningConfig = tuningConfig; this.ioConfig = ioConfig; this.granularitySpec = granularitySpec; + this.dimensionsSpec = dimensionsSpec; this.taskContext = taskContext; } @@ -131,6 +134,13 @@ public UserCompactionTaskGranularityConfig getGranularitySpec() return granularitySpec; } + @JsonProperty + @Nullable + public UserCompactionTaskDimensionsConfig getDimensionsSpec() + { + return dimensionsSpec; + } + @JsonProperty @Nullable public Map getTaskContext() @@ -155,6 +165,7 @@ public boolean equals(Object o) Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) && Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(granularitySpec, that.granularitySpec) && + Objects.equals(dimensionsSpec, that.dimensionsSpec) && Objects.equals(ioConfig, that.ioConfig) && Objects.equals(taskContext, that.taskContext); } @@ -170,6 +181,7 @@ public int hashCode() skipOffsetFromLatest, tuningConfig, granularitySpec, + dimensionsSpec, ioConfig, taskContext ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java new file mode 100644 index 000000000000..bee7f3553b1f --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.java.util.common.parsers.ParserUtils; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Spec containing dimension configs for Compaction Task. + * This class mimics JSON field names for fields supported in compaction task with + * the corresponding fields in {@link org.apache.druid.data.input.impl.DimensionsSpec}. + * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set + * dimension configs for Compaction task as they would for any other ingestion task. + */ +public class UserCompactionTaskDimensionsConfig +{ + @Nullable private final List dimensions; + + @JsonCreator + public UserCompactionTaskDimensionsConfig( + @Nullable @JsonProperty("dimensions") List dimensions + ) + { + if (dimensions != null) { + List dimensionNames = dimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList()); + ParserUtils.validateFields(dimensionNames); + } + this.dimensions = dimensions; + } + + @Nullable + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserCompactionTaskDimensionsConfig that = (UserCompactionTaskDimensionsConfig) o; + return Objects.equals(dimensions, that.dimensions); + } + + @Override + public int hashCode() + { + return Objects.hash(dimensions); + } + + @Override + public String toString() + { + return "UserCompactionTaskDimensionsConfig{" + + "dimensions=" + dimensions + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index b58910aba153..adf3c96978ec 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; +import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; @@ -341,16 +342,26 @@ private CoordinatorStats doRun( snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size()); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); - ClientCompactionTaskGranularitySpec queryGranularitySpec; + // Create granularitySpec to send to compaction task + ClientCompactionTaskGranularitySpec granularitySpec; if (config.getGranularitySpec() != null) { - queryGranularitySpec = new ClientCompactionTaskGranularitySpec( + granularitySpec = new ClientCompactionTaskGranularitySpec( config.getGranularitySpec().getSegmentGranularity(), config.getGranularitySpec().getQueryGranularity(), config.getGranularitySpec().isRollup() ); } else { - queryGranularitySpec = null; + granularitySpec = null; + } + // Create dimensionsSpec to send to compaction task + ClientCompactionTaskDimensionsSpec dimensionsSpec; + if (config.getDimensionsSpec() != null) { + dimensionsSpec = new ClientCompactionTaskDimensionsSpec( + config.getDimensionsSpec().getDimensions() + ); + } else { + dimensionsSpec = null; } Boolean dropExisting = null; @@ -364,7 +375,8 @@ private CoordinatorStats doRun( segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), - queryGranularitySpec, + granularitySpec, + dimensionsSpec, dropExisting, newAutoCompactionContext(config.getTaskContext()) ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index 4225fecc7f09..11dd2c406be6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -26,6 +26,8 @@ import com.google.common.collect.Maps; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.DateTimes; @@ -424,6 +426,11 @@ private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCom existingGranularitySpec.isRollup() : null; if (existingRollup == null || !config.getGranularitySpec().isRollup().equals(existingRollup)) { + log.info( + "Configured rollup[%s] is different from the rollup[%s] of segments. Needs compaction", + config.getGranularitySpec().isRollup(), + existingRollup + ); return true; } } @@ -435,6 +442,27 @@ private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCom existingGranularitySpec.getQueryGranularity() : null; if (!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity)) { + log.info( + "Configured queryGranularity[%s] is different from the queryGranularity[%s] of segments. Needs compaction", + config.getGranularitySpec().getQueryGranularity(), + existingQueryGranularity + ); + return true; + } + } + } + + if (config.getDimensionsSpec() != null) { + final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec(); + // Checks for list of dimensions + if (config.getDimensionsSpec().getDimensions() != null) { + final List existingDimensions = existingDimensionsSpec != null ? + existingDimensionsSpec.getDimensions() : + null; + if (!config.getDimensionsSpec().getDimensions().equals(existingDimensions)) { + log.info( + "Configured dimensionsSpec is different from the dimensionsSpec of segments. Needs compaction" + ); return true; } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java new file mode 100644 index 000000000000..6b2df99f3dff --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class ClientCompactionTaskDimensionsSpecTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(ClientCompactionTaskDimensionsSpec.class) + .withPrefabValues( + DimensionSchema.class, + new StringDimensionSchema("bar", DimensionSchema.MultiValueHandling.ofDefault(), true), + new StringDimensionSchema("foo", DimensionSchema.MultiValueHandling.ofDefault(), true) + ) + .usingGetClass() + .verify(); + } + + @Test + public void testSerde() throws IOException + { + final ClientCompactionTaskDimensionsSpec expected = new ClientCompactionTaskDimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")) + ); + final ObjectMapper mapper = new ObjectMapper(); + final byte[] json = mapper.writeValueAsBytes(expected); + final ClientCompactionTaskDimensionsSpec fromJson = (ClientCompactionTaskDimensionsSpec) mapper.readValue( + json, + ClientCompactionTaskDimensionsSpec.class + ); + Assert.assertEquals(expected, fromJson); + } + + @Test(expected = ParseException.class) + public void testInvalidDimensionsField() + { + final ClientCompactionTaskDimensionsSpec expected = new ClientCompactionTaskDimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim", "dim")) + ); + } +} diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 2035dbae76e9..5bc7b87b0fce 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -51,6 +51,7 @@ public String compactSegments( int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec, @Nullable Boolean dropExisting, @Nullable Map context ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 4f8fd12635cc..27732ef487e5 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -20,8 +20,10 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.HumanReadableBytes; @@ -59,6 +61,7 @@ public void testSerdeBasic() throws IOException null, null, null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -86,6 +89,7 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException null, null, null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -130,6 +134,7 @@ public void testSerdeWithMaxTotalRows() throws IOException ), null, null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -174,6 +179,7 @@ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException ), null, null, + null, ImmutableMap.of("key", "val") ); @@ -240,6 +246,7 @@ public void testSerdeGranularitySpec() throws IOException null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -267,6 +274,7 @@ public void testSerdeGranularitySpecWithQueryGranularity() throws Exception null, new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null), null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -297,6 +305,7 @@ public void testSerdeWithNullGranularitySpec() throws IOException null, null, null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -324,6 +333,7 @@ public void testSerdeGranularitySpecWithNullValues() throws IOException null, new UserCompactionTaskGranularityConfig(null, null, null), null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -351,6 +361,7 @@ public void testSerdeGranularitySpecWithRollup() throws IOException null, new UserCompactionTaskGranularityConfig(null, null, true), null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -380,6 +391,7 @@ public void testSerdeIOConfigWithNonNullDropExisting() throws IOException new Period(3600), null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), + null, new UserCompactionTaskIOConfig(true), ImmutableMap.of("key", "val") ); @@ -408,6 +420,7 @@ public void testSerdeIOConfigWithNullDropExisting() throws IOException new Period(3600), null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), + null, new UserCompactionTaskIOConfig(null), ImmutableMap.of("key", "val") ); @@ -424,4 +437,32 @@ public void testSerdeIOConfigWithNullDropExisting() throws IOException Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig()); } + + @Test + public void testSerdeDimensionsSpec() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + null, + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))), + null, + ImmutableMap.of("key", "val") + ); + final String json = OBJECT_MAPPER.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + Assert.assertEquals(config.getDimensionsSpec(), fromJson.getDimensionsSpec()); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java new file mode 100644 index 000000000000..4862b4530eac --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class UserCompactionTaskDimensionsConfigTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(UserCompactionTaskDimensionsConfig.class) + .withPrefabValues( + DimensionSchema.class, + new StringDimensionSchema("bar", DimensionSchema.MultiValueHandling.ofDefault(), true), + new StringDimensionSchema("foo", DimensionSchema.MultiValueHandling.ofDefault(), true) + ) + .usingGetClass() + .verify(); + } + + @Test + public void testSerde() throws IOException + { + final UserCompactionTaskDimensionsConfig expected = new UserCompactionTaskDimensionsConfig( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")) + ); + final ObjectMapper mapper = new ObjectMapper(); + final byte[] json = mapper.writeValueAsBytes(expected); + final UserCompactionTaskDimensionsConfig fromJson = (UserCompactionTaskDimensionsConfig) mapper.readValue( + json, + UserCompactionTaskDimensionsConfig.class + ); + Assert.assertEquals(expected, fromJson); + } + + @Test(expected = ParseException.class) + public void testInvalidDimensionsField() + { + final UserCompactionTaskDimensionsConfig expected = new UserCompactionTaskDimensionsConfig( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim", "dim")) + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 494f567a47f2..2da165abb502 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -30,6 +30,7 @@ import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; +import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; @@ -38,6 +39,7 @@ import org.apache.druid.client.indexing.IndexingWorker; import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; @@ -66,6 +68,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; @@ -387,12 +390,12 @@ public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIn DataSegment afterNoon = createSegment(dataSourceName, j, false, k); if (j == 3) { // Make two intervals on this day compacted (two compacted intervals back-to-back) - beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of())); - afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of())); + beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of())); + afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of())); } if (j == 1) { // Make one interval on this day compacted - afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of())); + afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of())); } segments.add(beforeNoon); segments.add(afterNoon); @@ -671,6 +674,7 @@ public void testCompactWithoutGranularitySpec() ), null, null, + null, null ) ); @@ -685,6 +689,7 @@ public void testCompactWithoutGranularitySpec() ArgumentMatchers.any(), granularitySpecArgumentCaptor.capture(), ArgumentMatchers.any(), + ArgumentMatchers.any(), ArgumentMatchers.any() ); // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same @@ -726,6 +731,7 @@ public void testCompactWithNotNullIOConfig() null ), null, + null, new UserCompactionTaskIOConfig(true), null ) @@ -738,6 +744,7 @@ public void testCompactWithNotNullIOConfig() ArgumentMatchers.anyInt(), ArgumentMatchers.any(), ArgumentMatchers.any(), + ArgumentMatchers.any(), dropExistingCapture.capture(), ArgumentMatchers.any() ); @@ -779,6 +786,7 @@ public void testCompactWithNullIOConfig() ), null, null, + null, null ) ); @@ -790,6 +798,7 @@ public void testCompactWithNullIOConfig() ArgumentMatchers.anyInt(), ArgumentMatchers.any(), ArgumentMatchers.any(), + ArgumentMatchers.any(), dropExistingCapture.capture(), ArgumentMatchers.any() ); @@ -831,6 +840,7 @@ public void testCompactWithGranularitySpec() ), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), null, + null, null ) ); @@ -845,6 +855,7 @@ public void testCompactWithGranularitySpec() ArgumentMatchers.any(), granularitySpecArgumentCaptor.capture(), ArgumentMatchers.any(), + ArgumentMatchers.any(), ArgumentMatchers.any() ); // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment @@ -856,6 +867,119 @@ public void testCompactWithGranularitySpec() Assert.assertEquals(expected, actual); } + @Test + public void testCompactWithDimensionSpec() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass( + ClientCompactionTaskDimensionsSpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + dimensionsSpecArgumentCaptor.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue(); + Assert.assertNotNull(actual); + Assert.assertEquals(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), actual.getDimensions()); + } + + @Test + public void testCompactWithoutDimensionSpec() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass( + ClientCompactionTaskDimensionsSpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + dimensionsSpecArgumentCaptor.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue(); + Assert.assertNull(actual); + } + @Test public void testCompactWithRollupInGranularitySpec() { @@ -891,6 +1015,7 @@ public void testCompactWithRollupInGranularitySpec() ), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, true), null, + null, null ) ); @@ -905,6 +1030,7 @@ public void testCompactWithRollupInGranularitySpec() ArgumentMatchers.any(), granularitySpecArgumentCaptor.capture(), ArgumentMatchers.any(), + ArgumentMatchers.any(), ArgumentMatchers.any() ); Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); @@ -947,6 +1073,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() ), null, new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null), + null, null ) ); @@ -983,6 +1110,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() ), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), null, + null, null ) ); @@ -1002,6 +1130,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() ArgumentMatchers.any(), granularitySpecArgumentCaptor.capture(), ArgumentMatchers.any(), + ArgumentMatchers.any(), ArgumentMatchers.any() ); // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment @@ -1400,6 +1529,7 @@ private List createCompactionConfigs(@Nullable Integ ), null, null, + null, null ) ); @@ -1502,7 +1632,7 @@ private StringFullResponseHolder handleTask(Request request) throws IOException compactSegments( timeline, segments, - compactionTaskQuery.getTuningConfig() + compactionTaskQuery ); return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId()))); } @@ -1515,7 +1645,7 @@ private StringFullResponseHolder handleLockedIntervals() throws IOException private void compactSegments( VersionedIntervalTimeline timeline, List segments, - ClientCompactionTaskQueryTuningConfig tuningConfig + ClientCompactionTaskQuery clientCompactionTaskQuery ) { Preconditions.checkArgument(segments.size() > 1); @@ -1539,13 +1669,13 @@ private void compactSegments( final String version = "newVersion_" + compactVersionSuffix++; final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2; final PartitionsSpec compactionPartitionsSpec; - if (tuningConfig.getPartitionsSpec() instanceof DynamicPartitionsSpec) { + if (clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec() instanceof DynamicPartitionsSpec) { compactionPartitionsSpec = new DynamicPartitionsSpec( - tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(), - ((DynamicPartitionsSpec) tuningConfig.getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE) + clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(), + ((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE) ); } else { - compactionPartitionsSpec = tuningConfig.getPartitionsSpec(); + compactionPartitionsSpec = clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec(); } for (int i = 0; i < 2; i++) { @@ -1559,6 +1689,11 @@ private void compactSegments( shardSpecFactory.apply(i, 2), new CompactionState( compactionPartitionsSpec, + clientCompactionTaskQuery.getDimensionsSpec() == null ? null : new DimensionsSpec( + clientCompactionTaskQuery.getDimensionsSpec().getDimensions(), + null, + null + ), ImmutableMap.of( "bitmap", ImmutableMap.of("type", "roaring", "compressRunOnSerialization", true), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java index a7d1d37be16f..674c7f2447af 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java @@ -232,6 +232,7 @@ public void testRunRemoveInactiveDatasourceCompactionConfig() null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, + null, ImmutableMap.of("key", "val") ); @@ -244,6 +245,7 @@ public void testRunRemoveInactiveDatasourceCompactionConfig() null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, + null, ImmutableMap.of("key", "val") ); CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig)); @@ -348,6 +350,7 @@ public void testRunRetryForRetryableException() null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, + null, ImmutableMap.of("key", "val") ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java index 092bc9f4386e..1dc690826aa0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java @@ -93,6 +93,7 @@ public void testFindPartitionsSpecFromConfigWithNullTuningConfigReturnDynamicPar null, null, null, + null, null ); Assert.assertEquals( @@ -133,6 +134,7 @@ public void testFindPartitionsSpecFromConfigWithNullMaxTotalRowsReturnLongMaxVal ), null, null, + null, null ); Assert.assertEquals( @@ -173,6 +175,7 @@ public void testFindPartitionsSpecFromConfigWithNonNullMaxTotalRowsReturnGivenVa ), null, null, + null, null ); Assert.assertEquals( @@ -213,6 +216,7 @@ public void testFindPartitionsSpecFromConfigWithNonNullMaxRowsPerSegmentReturnGi ), null, null, + null, null ); Assert.assertEquals( @@ -253,6 +257,7 @@ public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMa ), null, null, + null, null ); Assert.assertEquals( @@ -293,6 +298,7 @@ public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndPa ), null, null, + null, null ); Assert.assertEquals( @@ -333,6 +339,7 @@ public void testFindPartitionsSpecFromConfigWithDeprecatedMaxTotalRowsAndPartiti ), null, null, + null, null ); Assert.assertEquals( @@ -373,6 +380,7 @@ public void testFindPartitionsSpecFromConfigWithHashPartitionsSpec() ), null, null, + null, null ); Assert.assertEquals( @@ -413,6 +421,7 @@ public void testFindPartitionsSpecFromConfigWithRangePartitionsSpec() ), null, null, + null, null ); Assert.assertEquals( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index a202b6dd1a00..82a6573c9e54 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -37,6 +38,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.ConciseBitmapSerdeFactory; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -696,13 +698,13 @@ public void testIteratorReturnsNothingAsSegmentsWasCompactedAndHaveSameSegmentGr Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, null) + new CompactionState(partitionsSpec, null, indexSpec, null) ), new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, null) + new CompactionState(partitionsSpec, null, indexSpec, null) ) ); @@ -729,13 +731,13 @@ public void testIteratorReturnsNothingAsSegmentsWasCompactedAndHaveSameSegmentGr Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day")) ), new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day")) ) ); @@ -762,13 +764,13 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentSeg Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, null) + new CompactionState(partitionsSpec, null, indexSpec, null) ), new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, null) + new CompactionState(partitionsSpec, null, indexSpec, null) ) ); @@ -805,13 +807,13 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentSeg Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day")) ), new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day")) ) ); @@ -848,7 +850,7 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentTim Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, null) + new CompactionState(partitionsSpec, null, indexSpec, null) ) ); @@ -900,7 +902,7 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentOri Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, null) + new CompactionState(partitionsSpec, null, indexSpec, null) ) ); @@ -954,19 +956,19 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentRol Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "false")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("rollup", "false")) ), new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "true")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("rollup", "true")) ), new SegmentGenerateSpec( Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of()) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of()) ) ); @@ -1014,19 +1016,19 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentQue Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "day")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("queryGranularity", "day")) ), new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "minute")) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("queryGranularity", "minute")) ), new SegmentGenerateSpec( Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of()) + new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of()) ) ); @@ -1057,6 +1059,100 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentQue Assert.assertFalse(iterator.hasNext()); } + @Test + public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentDimensions() + { + // Same indexSpec as what is set in the auto compaction config + Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); + // Same partitionsSpec as what is set in the auto compaction config + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + + // Create segments that were compacted (CompactionState != null) and have + // Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, + // Dimensions=["foo"] for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, + // Dimensions=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (dimensions was not set during last compaction) + // and dimensionsSpec=null for interval 2017-10-04T00:00:00/2017-10-05T00:00:00 (dimensionsSpec was not set during last compaction) + final VersionedIntervalTimeline timeline = createTimeline( + new SegmentGenerateSpec( + Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), indexSpec, null) + ), + new SegmentGenerateSpec( + Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo")), null, null), indexSpec, null) + ), + new SegmentGenerateSpec( + Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, new DimensionsSpec(null, null, null), indexSpec, null) + ), + new SegmentGenerateSpec( + Intervals.of("2017-10-04T00:00:00/2017-10-05T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, null, indexSpec, null) + ) + ); + + // Auto compaction config sets Dimensions=["foo"] + CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig( + 130000, + new Period("P0D"), + null, + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))) + )), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + // We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00, interval 2017-10-04T00:00:00/2017-10-05T00:00:00, and interval 2017-10-03T00:00:00/2017-10-04T00:00:00. + Assert.assertTrue(iterator.hasNext()); + List expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-04T00:00:00/2017-10-05T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + Assert.assertTrue(iterator.hasNext()); + expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + Assert.assertTrue(iterator.hasNext()); + expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + // No more + Assert.assertFalse(iterator.hasNext()); + + // Auto compaction config sets Dimensions=null + iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig( + 130000, + new Period("P0D"), + null, + new UserCompactionTaskDimensionsConfig(null) + )), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + // No more + Assert.assertFalse(iterator.hasNext()); + } + @Test public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() { @@ -1101,7 +1197,7 @@ public void testIteratorReturnsSegmentsAsCompactionStateChangedWithCompactedStat Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), null, - new CompactionState(partitionsSpec, newIndexSpecMap, null) + new CompactionState(partitionsSpec, null, newIndexSpecMap, null) ) ); @@ -1233,6 +1329,16 @@ private DataSourceCompactionConfig createCompactionConfig( Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec ) + { + return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null); + } + + private DataSourceCompactionConfig createCompactionConfig( + long inputSegmentSizeBytes, + Period skipOffsetFromLatest, + UserCompactionTaskGranularityConfig granularitySpec, + UserCompactionTaskDimensionsConfig dimensionsSpec + ) { return new DataSourceCompactionConfig( DATA_SOURCE, @@ -1242,6 +1348,7 @@ private DataSourceCompactionConfig createCompactionConfig( skipOffsetFromLatest, null, granularitySpec, + dimensionsSpec, null, null ); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 353550de46c6..fc16507cedfd 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -57,6 +57,7 @@ public class CoordinatorCompactionConfigsResourceTest null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, + null, ImmutableMap.of("key", "val") ); private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3}; @@ -152,6 +153,7 @@ public void testAddOrUpdateCompactionConfigWithExistingConfig() null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true), null, + null, ImmutableMap.of("key", "val") ); String author = "maytas"; @@ -192,6 +194,7 @@ public void testDeleteCompactionConfigWithExistingConfig() null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, + null, ImmutableMap.of("key", "val") ); final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete)); @@ -313,6 +316,7 @@ public void testAddOrUpdateCompactionConfigWithoutExistingConfig() null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, + null, ImmutableMap.of("key", "val") ); String author = "maytas"; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 39368557d49a..72740a57c5b3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -275,6 +275,7 @@ public void setUp() throws Exception private final CompactionState expectedCompactionState = new CompactionState( new DynamicPartitionsSpec(null, null), + null, Collections.singletonMap("test", "map"), Collections.singletonMap("test2", "map2") ); diff --git a/website/.spelling b/website/.spelling index c2b9dd99f7d2..9d474f7f6eec 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1775,6 +1775,7 @@ cpuacct dataSourceName datetime defaultHistory +dimensionsSpec doubleMax doubleMin doubleSum