From 2af0ab24252e80cb3c2b9615500cffbfed595791 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Mon, 7 Aug 2023 18:32:48 -0700 Subject: [PATCH] Metric to report time spent fetching and analyzing segments (#14752) * Metric to report time spent fetching and analyzing segments * fix test * spell check * fix tests * checkstyle * remove unused variable * Update docs/operations/metrics.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * Update docs/operations/metrics.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> * Update docs/operations/metrics.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> --------- Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> --- docs/operations/metrics.md | 12 ++- .../indexing/common/task/CompactionTask.java | 32 ++++++- .../common/task/CompactionTaskTest.java | 90 +++++++++++++++---- ...stractParallelIndexSupervisorTaskTest.java | 3 + 4 files changed, 113 insertions(+), 24 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index c8918e35170f..6383a4057c44 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -154,7 +154,7 @@ If SQL is enabled, the Broker will emit the following metrics for SQL. ## Ingestion metrics -## General native ingestion metrics +### General native ingestion metrics |Metric|Description|Dimensions|Normal value| |------|-----------|----------|------------| @@ -203,6 +203,14 @@ These metrics apply to the [Kinesis indexing service](../development/extensions- |`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. | |`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. | +### Compaction metrics + +[Compaction tasks](../data-management/compaction.md) emit the following metrics. + +|Metric|Description|Dimensions|Normal value| +|------|-----------|----------|------------| +|`compact/segmentAnalyzer/fetchAndProcessMillis`|Time taken to fetch and process segments to infer the schema for the compaction task to run.|`dataSource`, `taskId`, `taskType`, `groupId`,`tags`| Varies. A high value indicates compaction tasks will speed up from explicitly setting the data schema. | + ### Other ingestion metrics Streaming ingestion tasks and certain types of @@ -232,7 +240,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s | |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| - +|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. ## Indexing service diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 108d186b75d9..c068aa1433fb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -75,6 +75,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.IndexIO; @@ -103,6 +104,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -126,6 +128,8 @@ public class CompactionTask extends AbstractBatchIndexTask { private static final Logger log = new Logger(CompactionTask.class); + private static final Clock UTC_CLOCK = Clock.systemUTC(); + /** * The CompactionTask creates and runs multiple IndexTask instances. When the {@link AppenderatorsManager} @@ -455,6 +459,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception emitCompactIngestionModeMetrics(toolbox.getEmitter(), ioConfig.isDropExisting()); final List ingestionSpecs = createIngestionSchema( + UTC_CLOCK, toolbox, getTaskLockHelper().getLockGranularityToUse(), ioConfig, @@ -465,7 +470,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception metricsSpec, granularitySpec, toolbox.getCoordinatorClient(), - segmentCacheManagerFactory + segmentCacheManagerFactory, + getMetricBuilder() ); final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) @@ -562,6 +568,7 @@ private String createIndexTaskSpecId(int i) */ @VisibleForTesting static List createIngestionSchema( + final Clock clock, final TaskToolbox toolbox, final LockGranularity lockGranularityInUse, final CompactionIOConfig ioConfig, @@ -572,7 +579,8 @@ static List createIngestionSchema( @Nullable final AggregatorFactory[] metricsSpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, final CoordinatorClient coordinatorClient, - final SegmentCacheManagerFactory segmentCacheManagerFactory + final SegmentCacheManagerFactory segmentCacheManagerFactory, + final ServiceMetricEvent.Builder metricBuilder ) throws IOException { final List> timelineSegments = retrieveRelevantTimelineHolders( @@ -628,6 +636,9 @@ static List createIngestionSchema( // creates new granularitySpec and set segmentGranularity Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(); final DataSchema dataSchema = createDataSchema( + clock, + toolbox.getEmitter(), + metricBuilder, segmentProvider.dataSource, interval, lazyFetchSegments(segmentsToCompact, toolbox.getSegmentCacheManager(), toolbox.getIndexIO()), @@ -659,6 +670,9 @@ static List createIngestionSchema( } else { // given segment granularity final DataSchema dataSchema = createDataSchema( + clock, + toolbox.getEmitter(), + metricBuilder, segmentProvider.dataSource, JodaUtils.umbrellaInterval( Iterables.transform( @@ -756,6 +770,9 @@ private static List> retrieveRelevantT } private static DataSchema createDataSchema( + Clock clock, + ServiceEmitter emitter, + ServiceMetricEvent.Builder metricBuilder, String dataSource, Interval totalInterval, Iterable>>> segments, @@ -773,8 +790,15 @@ private static DataSchema createDataSchema( dimensionsSpec == null, metricsSpec == null ); - - existingSegmentAnalyzer.fetchAndProcessIfNeeded(); + long start = clock.millis(); + try { + existingSegmentAnalyzer.fetchAndProcessIfNeeded(); + } + finally { + if (emitter != null) { + emitter.emit(metricBuilder.build("compact/segmentAnalyzer/fetchAndProcessMillis", clock.millis() - start)); + } + } final Granularity queryGranularityToUse; if (granularitySpec.getQueryGranularity() == null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 76ea03a8176e..c7f6168d85da 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -87,6 +87,8 @@ import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.CachingEmitter; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; @@ -144,11 +146,16 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -162,6 +169,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +@RunWith(MockitoJUnitRunner.class) public class CompactionTaskTest { private static final long SEGMENT_SIZE_BYTES = 100; @@ -194,6 +202,8 @@ public class CompactionTaskTest "Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n" + "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity"; + private static final ServiceMetricEvent.Builder METRIC_BUILDER = new ServiceMetricEvent.Builder(); + private static Map DIMENSIONS; private static List AGGREGATORS; private static List SEGMENTS; @@ -363,15 +373,22 @@ private static CompactionTask.CompactionTuningConfig createTuningConfig() @Rule public ExpectedException expectedException = ExpectedException.none(); + @Mock + private Clock clock; + private CachingEmitter emitter; + @Before public void setup() { final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP); + emitter = new CachingEmitter(); toolbox = makeTaskToolbox( new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())), testIndexIO, - SEGMENT_MAP + SEGMENT_MAP, + emitter ); + Mockito.when(clock.millis()).thenReturn(0L, 10_000L); segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER); } @@ -931,6 +948,7 @@ public void testSegmentProviderFindSegmentsWithEmptySegmentsThrowException() public void testCreateIngestionSchema() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -941,7 +959,8 @@ public void testCreateIngestionSchema() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1004,6 +1023,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1014,7 +1034,8 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1078,6 +1099,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1088,7 +1110,8 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1152,6 +1175,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1162,7 +1186,8 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1216,6 +1241,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti ); final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1226,7 +1252,8 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); ingestionSpecs.sort( @@ -1260,6 +1287,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException }; final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1270,7 +1298,8 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException customMetricsSpec, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1297,6 +1326,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException public void testCreateIngestionSchemaWithCustomSegments() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1307,7 +1337,8 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1340,6 +1371,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio // Remove one segment in the middle segments.remove(segments.size() / 2); CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1350,7 +1382,8 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); } @@ -1364,6 +1397,7 @@ public void testMissingMetadata() throws IOException indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null)); final List segments = new ArrayList<>(SEGMENTS); CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1374,7 +1408,8 @@ public void testMissingMetadata() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); } @@ -1400,6 +1435,7 @@ public void testEmptyInterval() public void testSegmentGranularityAndNullQueryGranularity() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1410,7 +1446,8 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException null, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null), COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1438,6 +1475,7 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException public void testQueryGranularityAndNullSegmentGranularity() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1448,7 +1486,8 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException null, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null), COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1474,6 +1513,7 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException public void testQueryGranularityAndSegmentGranularityNonNull() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1488,7 +1528,8 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio null ), COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1510,12 +1551,16 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio new PeriodGranularity(Period.months(3), null, null), BatchIOConfig.DEFAULT_DROP_EXISTING ); + Assert.assertEquals(10_000L, emitter.getLastEmittedEvent().toMap().get("value")); + Assert.assertEquals("compact/segmentAnalyzer/fetchAndProcessMillis", emitter.getLastEmittedEvent().toMap().get("metric")); + Assert.assertEquals("metrics", emitter.getLastEmittedEvent().getFeed()); } @Test public void testNullGranularitySpec() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1526,7 +1571,8 @@ public void testNullGranularitySpec() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1553,6 +1599,7 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1563,7 +1610,8 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity null, new ClientCompactionTaskGranularitySpec(null, null, null), COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1590,6 +1638,7 @@ public void testGranularitySpecWithNotNullRollup() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1600,7 +1649,8 @@ public void testGranularitySpecWithNotNullRollup() null, new ClientCompactionTaskGranularitySpec(null, null, true), COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); Assert.assertEquals(6, ingestionSpecs.size()); @@ -1614,6 +1664,7 @@ public void testGranularitySpecWithNullRollup() throws IOException { final List ingestionSpecs = CompactionTask.createIngestionSchema( + clock, toolbox, LockGranularity.TIME_CHUNK, new CompactionIOConfig(null, false, null), @@ -1624,7 +1675,8 @@ public void testGranularitySpecWithNullRollup() null, new ClientCompactionTaskGranularitySpec(null, null, null), COORDINATOR_CLIENT, - segmentCacheManagerFactory + segmentCacheManagerFactory, + METRIC_BUILDER ); Assert.assertEquals(6, ingestionSpecs.size()); for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) { @@ -1880,7 +1932,8 @@ public ListenableFuture> fetchUsedSegments( private static TaskToolbox makeTaskToolbox( TaskActionClient taskActionClient, IndexIO indexIO, - Map segments + Map segments, + CachingEmitter emitter ) { final SegmentCacheManager segmentCacheManager = new NoopSegmentCacheManager() @@ -1921,6 +1974,7 @@ public void cleanup(DataSegment segment) .segmentCacheManager(segmentCacheManager) .taskLogPusher(null) .attemptId("1") + .emitter(new ServiceEmitter("service", "host", emitter)) .build(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 94d286ec2b24..a5c77b33bb10 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -76,8 +76,10 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.query.CachingEmitter; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; @@ -708,6 +710,7 @@ public File getStorageDirectory() .shuffleClient(new LocalShuffleClient(intermediaryDataManager)) .taskLogPusher(null) .attemptId("1") + .emitter(new ServiceEmitter("service", "host", new CachingEmitter())) .build(); }