Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support changing dimension schema in Auto Compaction #11874

Merged
merged 17 commits into from
Nov 9, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void setup()
null,
null,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;

import java.util.Map;
Expand All @@ -40,6 +41,7 @@
public class CompactionState
{
private final PartitionsSpec partitionsSpec;
private final DimensionsSpec dimensionsSpec;
// org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which
// has a dependency on the 'core' module where this class is.
private final Map<String, Object> indexSpec;
Expand All @@ -50,11 +52,13 @@ public class CompactionState
@JsonCreator
public CompactionState(
@JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("indexSpec") Map<String, Object> indexSpec,
@JsonProperty("granularitySpec") Map<String, Object> granularitySpec
)
{
this.partitionsSpec = partitionsSpec;
this.dimensionsSpec = dimensionsSpec;
this.indexSpec = indexSpec;
this.granularitySpec = granularitySpec;
}
Expand All @@ -65,6 +69,12 @@ public PartitionsSpec getPartitionsSpec()
return partitionsSpec;
}

@JsonProperty
public DimensionsSpec getDimensionsSpec()
{
return dimensionsSpec;
}

@JsonProperty
public Map<String, Object> getIndexSpec()
{
Expand All @@ -88,21 +98,23 @@ public boolean equals(Object o)
}
CompactionState that = (CompactionState) o;
return Objects.equals(partitionsSpec, that.partitionsSpec) &&
Objects.equals(dimensionsSpec, that.dimensionsSpec) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(granularitySpec, that.granularitySpec);
}

@Override
public int hashCode()
{
return Objects.hash(partitionsSpec, indexSpec, granularitySpec);
return Objects.hash(partitionsSpec, dimensionsSpec, indexSpec, granularitySpec);
}

@Override
public String toString()
{
return "CompactionState{" +
"partitionsSpec=" + partitionsSpec +
", dimensionsSpec=" + dimensionsSpec +
", indexSpec=" + indexSpec +
", granularitySpec=" + granularitySpec +
'}';
Expand Down
99 changes: 99 additions & 0 deletions core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.RangeSet;
import org.apache.druid.TestObjectMapper;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
Expand Down Expand Up @@ -120,6 +121,7 @@ public void testV1Serialization() throws Exception
new NumberedShardSpec(3, 0),
new CompactionState(
new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "bar", "foo")), null, null),
suneet-s marked this conversation as resolved.
Show resolved Hide resolved
ImmutableMap.of(),
ImmutableMap.of()
),
Expand All @@ -142,6 +144,7 @@ public void testV1Serialization() throws Exception
Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec"));
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(4, ((Map) objectMap.get("lastCompactionState")).size());

DataSegment deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);

Expand All @@ -154,6 +157,7 @@ public void testV1Serialization() throws Exception
Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
Assert.assertEquals(segment.getId(), deserializedSegment.getId());
Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState());

deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
Assert.assertEquals(0, segment.compareTo(deserializedSegment));
Expand All @@ -165,6 +169,100 @@ public void testV1Serialization() throws Exception
Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
}

@Test
public void testDeserializationDataSegmentLastCompactionStateWithoutDimensionsSpec() throws Exception
{
final Interval interval = Intervals.of("2011-10-01/2011-10-02");
final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");
DataSegment segment = new DataSegment(
"something",
interval,
"1",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
new NumberedShardSpec(3, 0),
new CompactionState(
new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
null,
ImmutableMap.of(),
ImmutableMap.of()
),
TEST_VERSION,
1
);
String lastCompactionStateWithoutDimensionsSpec = "{"
+ "\"dataSource\": \"something\","
+ "\"interval\": \"2011-10-01T00:00:00.000Z/2011-10-02T00:00:00.000Z\","
+ "\"version\": \"1\","
+ "\"loadSpec\": {"
+ " \"something\": \"or_other\""
+ "},"
+ "\"dimensions\": \"dim1,dim2\","
+ "\"metrics\": \"met1,met2\","
+ "\"shardSpec\": {"
+ " \"type\": \"numbered\","
+ " \"partitionNum\": 3,"
+ " \"partitions\": 0"
+ "},"
+ "\"lastCompactionState\": {"
+ " \"partitionsSpec\": {"
+ " \"type\": \"hashed\","
+ " \"numShards\": null,"
+ " \"partitionDimensions\": [\"dim1\"],"
+ " \"partitionFunction\": \"murmur3_32_abs\","
+ " \"maxRowsPerSegment\": 100000"
+ " },"
+ " \"indexSpec\": {},"
+ " \"granularitySpec\": {}"
+ "},"
+ "\"binaryVersion\": 9,"
+ "\"size\": 1,"
+ "\"identifier\": \"something_2011-10-01T00:00:00.000Z_2011-10-02T00:00:00.000Z_1_3\""
+ "}";

final Map<String, Object> objectMap = MAPPER.readValue(
lastCompactionStateWithoutDimensionsSpec,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(11, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec"));
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(3, ((Map) objectMap.get("lastCompactionState")).size());

DataSegment deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
Assert.assertEquals(segment.getDataSource(), deserializedSegment.getDataSource());
Assert.assertEquals(segment.getInterval(), deserializedSegment.getInterval());
Assert.assertEquals(segment.getVersion(), deserializedSegment.getVersion());
Assert.assertEquals(segment.getLoadSpec(), deserializedSegment.getLoadSpec());
Assert.assertEquals(segment.getDimensions(), deserializedSegment.getDimensions());
Assert.assertEquals(segment.getMetrics(), deserializedSegment.getMetrics());
Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
Assert.assertEquals(segment.getId(), deserializedSegment.getId());
Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState());
Assert.assertNotNull(segment.getLastCompactionState());
Assert.assertNull(segment.getLastCompactionState().getDimensionsSpec());
Assert.assertNotNull(deserializedSegment.getLastCompactionState());
Assert.assertNull(deserializedSegment.getLastCompactionState().getDimensionsSpec());

deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
Assert.assertEquals(0, segment.compareTo(deserializedSegment));

deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
Assert.assertEquals(0, deserializedSegment.compareTo(segment));

deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
}

@Test
public void testIdentifier()
{
Expand Down Expand Up @@ -232,6 +330,7 @@ public void testWithLastCompactionState()
{
final CompactionState compactionState = new CompactionState(
new DynamicPartitionsSpec(null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null),
Collections.singletonMap("test", "map"),
Collections.singletonMap("test2", "map2")
);
Expand Down
14 changes: 12 additions & 2 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,8 @@ A description of the compaction config is:
|`skipOffsetFromLatest`|The offset for searching segments to be compacted in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Strongly recommended to set for realtime dataSources. See [Data handling with compaction](../ingestion/compaction.md#data-handling-with-compaction)|no (default = "P1D")|
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no|
|`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
|`granularitySpec`|Custom `granularitySpec`. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
|`dimensionsSpec`|Custom `dimensionsSpec`. See [Automatic compaction dimensionsSpec](#automatic-compaction-dimensions-spec)|No|
|`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no|

An example of compaction config is:
Expand Down Expand Up @@ -988,7 +989,7 @@ The below is a list of the supported configurations for auto compaction.
|`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|no (default = 5)|

###### Automatic compaction granularitySpec
You can optionally use the `granularitySpec` object to configure the segment granularity of the compacted segments.
You can optionally use the `granularitySpec` object to configure the granularity and rollup of the compacted segments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These fields are defined below, so it seems redundant to mention them again here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


`granularitySpec` takes the following keys:

Expand All @@ -998,6 +999,15 @@ You can optionally use the `granularitySpec` object to configure the segment gra
|`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No|

###### Automatic compaction dimensions spec
You can optionally use the `dimensionsSpec` object to configure the dimensions of the compacted segments.

`dimensionsSpec` takes the following keys:

|Field|Description|Required|
|-----|-----------|--------|
|`dimensions`| A list of dimension names or objects. Defaults to 'null', which preserves the original dimensions. Note that setting this will cause segments manually compacted with `dimensionExclusions` to be compacted again.|No|

###### Automatic compaction IOConfig

Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md).
Expand Down
13 changes: 12 additions & 1 deletion docs/ingestion/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ To perform a manual compaction, you submit a compaction task. Compaction tasks m
|`id`|Task id|No|
|`dataSource`|Data source name to compact|Yes|
|`ioConfig`|I/O configuration for compaction task. See [Compaction I/O configuration](#compaction-io-configuration) for details.|Yes|
|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one.|No|
|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one. See [Compaction dimensionsSpec](#compaction-dimensions-spec) for details.|No|
|`metricsSpec`|Custom metrics spec. The compaction task uses the specified metrics spec rather than generating one.|No|
|`segmentGranularity`|When set, the compaction task changes the segment granularity for the given interval. Deprecated. Use `granularitySpec`. |No.|
|`tuningConfig`|[Parallel indexing task tuningConfig](native-batch.md#tuningconfig). `awaitSegmentAvailabilityTimeoutMillis` in the tuning config is not currently supported for compaction tasks. Do not set it to a non-zero value.|No|
Expand Down Expand Up @@ -181,6 +181,17 @@ Druid supports two supported `inputSpec` formats:
|`type`|Task type. Should be `segments`|Yes|
|`segments`|A list of segment IDs|Yes|


### Compaction dimensions spec
You can optionally use the `dimensionsSpec` object to configure the dimensions of the compacted segments.

`dimensionsSpec` takes the following keys:

|Field|Description|Required|
|-----|-----------|--------|
|`dimensions`| A list of dimension names or objects. Cannot have the same column in both `dimensions` and `dimensionExclusions`. Defaults to `null`, which preserves the original dimensions.|No|
|`dimensionExclusions`| The names of dimensions to exclude from compaction. Only names are supported here, not objects. This list is only used if the dimensions list is null or empty; otherwise it is ignored. Defaults to `[]`.|No|

### Compaction granularity spec

You can optionally use the `granularitySpec` object to configure the segment granularity and the query granularity of the compacted segments. Their syntax is as follows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -56,6 +57,8 @@
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.IngestionSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -479,14 +482,22 @@ public static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConf
public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(
boolean storeCompactionState,
TaskToolbox toolbox,
IndexTuningConfig tuningConfig,
GranularitySpec granularitySpec
IngestionSpec ingestionSpec
)
{
if (storeCompactionState) {
final Map<String, Object> indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
final Map<String, Object> granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper());
final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap);
TuningConfig tuningConfig = ingestionSpec.getTuningConfig();
GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec();
// We do not need to store dimensionExclusions and spatialDimensions since auto compaction does not support them
DimensionsSpec dimensionsSpec = ingestionSpec.getDataSchema().getDimensionsSpec() == null
? null
: new DimensionsSpec(ingestionSpec.getDataSchema().getDimensionsSpec().getDimensions(), null, null);
final CompactionState compactionState = new CompactionState(
tuningConfig.getPartitionsSpec(),
dimensionsSpec,
tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()),
granularitySpec.asMap(toolbox.getJsonMapper())
);
return segments -> segments
.stream()
.map(s -> s.withLastCompactionState(compactionState))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,7 @@ private TaskStatus generateAndPublishSegments(
compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
ingestionSchema.getTuningConfig(),
ingestionSchema.getDataSchema().getGranularitySpec()
ingestionSchema
);

// Probably we can publish atomicUpdateGroup along with segments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,8 +1051,7 @@ private void publishSegments(
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
ingestionSchema.getTuningConfig(),
ingestionSchema.getDataSchema().getGranularitySpec()
ingestionSchema
);

Set<DataSegment> segmentsFoundForDrop = null;
Expand Down
Loading