Skip to content

Commit

Permalink
Metric to report time spent fetching and analyzing segments (#14752)
Browse files Browse the repository at this point in the history
* Metric to report time spent fetching and analyzing segments

* fix test

* spell check

* fix tests

* checkstyle

* remove unused variable

* Update docs/operations/metrics.md

Co-authored-by: Katya Macedo  <[email protected]>

* Update docs/operations/metrics.md

Co-authored-by: Katya Macedo  <[email protected]>

* Update docs/operations/metrics.md

Co-authored-by: Katya Macedo  <[email protected]>

---------

Co-authored-by: Katya Macedo <[email protected]>
  • Loading branch information
suneet-s and ektravel authored Aug 8, 2023
1 parent bff8f9e commit 2af0ab2
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 24 deletions.
12 changes: 10 additions & 2 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ If SQL is enabled, the Broker will emit the following metrics for SQL.

## Ingestion metrics

## General native ingestion metrics
### General native ingestion metrics

|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
Expand Down Expand Up @@ -203,6 +203,14 @@ These metrics apply to the [Kinesis indexing service](../development/extensions-
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |

### Compaction metrics

[Compaction tasks](../data-management/compaction.md) emit the following metrics.

|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
|`compact/segmentAnalyzer/fetchAndProcessMillis`|Time taken to fetch and process segments to infer the schema for the compaction task to run.|`dataSource`, `taskId`, `taskType`, `groupId`,`tags`| Varies. A high value indicates compaction tasks will speed up from explicitly setting the data schema. |

### Other ingestion metrics

Streaming ingestion tasks and certain types of
Expand Down Expand Up @@ -232,7 +240,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.|

|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.|
If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.

## Indexing service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.IndexIO;
Expand Down Expand Up @@ -103,6 +104,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -126,6 +128,8 @@
public class CompactionTask extends AbstractBatchIndexTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final Clock UTC_CLOCK = Clock.systemUTC();


/**
* The CompactionTask creates and runs multiple IndexTask instances. When the {@link AppenderatorsManager}
Expand Down Expand Up @@ -455,6 +459,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
emitCompactIngestionModeMetrics(toolbox.getEmitter(), ioConfig.isDropExisting());

final List<ParallelIndexIngestionSpec> ingestionSpecs = createIngestionSchema(
UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
ioConfig,
Expand All @@ -465,7 +470,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
metricsSpec,
granularitySpec,
toolbox.getCoordinatorClient(),
segmentCacheManagerFactory
segmentCacheManagerFactory,
getMetricBuilder()
);
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
Expand Down Expand Up @@ -562,6 +568,7 @@ private String createIndexTaskSpecId(int i)
*/
@VisibleForTesting
static List<ParallelIndexIngestionSpec> createIngestionSchema(
final Clock clock,
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final CompactionIOConfig ioConfig,
Expand All @@ -572,7 +579,8 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient,
final SegmentCacheManagerFactory segmentCacheManagerFactory
final SegmentCacheManagerFactory segmentCacheManagerFactory,
final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = retrieveRelevantTimelineHolders(
Expand Down Expand Up @@ -628,6 +636,9 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
final DataSchema dataSchema = createDataSchema(
clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
interval,
lazyFetchSegments(segmentsToCompact, toolbox.getSegmentCacheManager(), toolbox.getIndexIO()),
Expand Down Expand Up @@ -659,6 +670,9 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
} else {
// given segment granularity
final DataSchema dataSchema = createDataSchema(
clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
Expand Down Expand Up @@ -756,6 +770,9 @@ private static List<TimelineObjectHolder<String, DataSegment>> retrieveRelevantT
}

private static DataSchema createDataSchema(
Clock clock,
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder,
String dataSource,
Interval totalInterval,
Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segments,
Expand All @@ -773,8 +790,15 @@ private static DataSchema createDataSchema(
dimensionsSpec == null,
metricsSpec == null
);

existingSegmentAnalyzer.fetchAndProcessIfNeeded();
long start = clock.millis();
try {
existingSegmentAnalyzer.fetchAndProcessIfNeeded();
}
finally {
if (emitter != null) {
emitter.emit(metricBuilder.build("compact/segmentAnalyzer/fetchAndProcessMillis", clock.millis() - start));
}
}

final Granularity queryGranularityToUse;
if (granularitySpec.getQueryGranularity() == null) {
Expand Down
Loading

0 comments on commit 2af0ab2

Please sign in to comment.