Skip to content

Commit

Permalink
Duties in Indexing group (such as Auto Compaction) does not report me…
Browse files Browse the repository at this point in the history
…trics (#12352)

* add impl

* add unit tests

* fix checkstyle

* address comments

* fix checkstyle
  • Loading branch information
maytasm authored Mar 24, 2022
1 parent b6eeef3 commit ea51d8a
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public class DruidCoordinator
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;

private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";
Expand Down Expand Up @@ -765,8 +765,7 @@ private List<CoordinatorDuty> makeHistoricalManagementDuties()
new RunRules(DruidCoordinator.this),
new UnloadUnusedSegments(),
new MarkAsUnusedOvershadowedSegments(DruidCoordinator.this),
new BalanceSegments(DruidCoordinator.this),
new EmitClusterStatsAndMetrics(DruidCoordinator.this)
new BalanceSegments(DruidCoordinator.this)
);
}

Expand Down Expand Up @@ -841,7 +840,17 @@ protected class DutiesRunnable implements Runnable

protected DutiesRunnable(List<? extends CoordinatorDuty> duties, final int startingLeaderCounter, String alias)
{
this.duties = duties;
// Automatically add EmitClusterStatsAndMetrics duty to the group if it does not already exists
// This is to avoid human coding error (forgetting to add the EmitClusterStatsAndMetrics duty to the group)
// causing metrics from the duties to not being emitted.
if (duties.stream().noneMatch(duty -> duty instanceof EmitClusterStatsAndMetrics)) {
boolean isContainCompactSegmentDuty = duties.stream().anyMatch(duty -> duty instanceof CompactSegments);
List<CoordinatorDuty> allDuties = new ArrayList<>(duties);
allDuties.add(new EmitClusterStatsAndMetrics(DruidCoordinator.this, alias, isContainCompactSegmentDuty));
this.duties = allDuties;
} else {
this.duties = duties;
}
this.startingLeaderCounter = startingLeaderCounter;
this.dutiesRunnableAlias = alias;
}
Expand Down Expand Up @@ -958,6 +967,12 @@ public void run()
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
}
}

@VisibleForTesting
public List<? extends CoordinatorDuty> getDuties()
{
return duties;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
public static final String MAX_REPLICATION_FACTOR = "maxReplicationFactor";

private final DruidCoordinator coordinator;
private final String groupName;
private final boolean isContainCompactSegmentDuty;

public EmitClusterStatsAndMetrics(DruidCoordinator coordinator)
public EmitClusterStatsAndMetrics(DruidCoordinator coordinator, String groupName, boolean isContainCompactSegmentDuty)
{
this.coordinator = coordinator;
this.groupName = groupName;
this.isContainCompactSegmentDuty = isContainCompactSegmentDuty;
}

private void emitTieredStat(
Expand All @@ -64,6 +68,7 @@ private void emitTieredStat(
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
Expand All @@ -78,6 +83,7 @@ private void emitTieredStat(
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
Expand Down Expand Up @@ -107,6 +113,7 @@ private void emitDutyStat(
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY, duty)
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
Expand All @@ -133,6 +140,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
CoordinatorStats stats = params.getCoordinatorStats();
ServiceEmitter emitter = params.getEmitter();

if (DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP.equals(groupName)) {
emitStatsForHistoricalManagementDuties(cluster, stats, emitter, params);
}
if (isContainCompactSegmentDuty) {
emitStatsForCompactSegments(cluster, stats, emitter);
}

// Emit coordinator runtime stats
emitDutyStats(emitter, "coordinator/time", stats, "runtime");

return params;
}

private void emitStatsForHistoricalManagementDuties(DruidCluster cluster, CoordinatorStats stats, ServiceEmitter emitter, DruidCoordinatorRuntimeParams params)
{
stats.forEachTieredStat(
"assignedCount",
(final String tier, final long count) -> {
Expand Down Expand Up @@ -190,7 +212,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
);

emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"segment/overShadowed/count",
stats.getGlobalStat("overShadowedCount")
)
Expand Down Expand Up @@ -269,24 +293,28 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
.forEach((final String serverName, final LoadQueuePeon queuePeon) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/size", queuePeon.getLoadQueueSize()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/count", queuePeon.getSegmentsToLoad().size()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/dropQueue/count", queuePeon.getSegmentsToDrop().size()
)
Expand All @@ -299,6 +327,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final int numUnavailableUsedSegmentsInDataSource = entry.getIntValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/unavailable/count", numUnavailableUsedSegmentsInDataSource
)
Expand All @@ -314,6 +343,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/underReplicated/count", underReplicationCount
Expand All @@ -323,22 +353,54 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}
);

// Emit segment metrics
params.getUsedSegmentsTimelinesPerDataSource().forEach(
(String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> {
long totalSizeOfUsedSegments = dataSourceWithUsedSegments
.iterateAllObjects()
.stream()
.mapToLong(DataSegment::getSize)
.sum();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/size", totalSizeOfUsedSegments)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/count", dataSourceWithUsedSegments.getNumObjects())
);
}
);
}

private void emitStatsForCompactSegments(DruidCluster cluster, CoordinatorStats stats, ServiceEmitter emitter)
{
emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"compact/task/count",
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
)
);

emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"compactTask/maxSlot/count",
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
)
);

emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"compactTask/availableSlot/count",
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
)
Expand All @@ -349,6 +411,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/waitCompact/bytes", count)
);
Expand All @@ -360,6 +423,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/waitCompact/count", count)
);
Expand All @@ -371,6 +435,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/waitCompact/count", count)
);
Expand All @@ -382,6 +447,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/bytes", count)
);
Expand All @@ -393,6 +459,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/count", count)
);
Expand All @@ -404,6 +471,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/skipCompact/count", count)
);
Expand All @@ -415,6 +483,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/bytes", count)
);
Expand All @@ -426,6 +495,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/count", count)
);
Expand All @@ -437,36 +507,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/compacted/count", count)
);
}
);

// Emit segment metrics
params.getUsedSegmentsTimelinesPerDataSource().forEach(
(String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> {
long totalSizeOfUsedSegments = dataSourceWithUsedSegments
.iterateAllObjects()
.stream()
.mapToLong(DataSegment::getSize)
.sum();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/size", totalSizeOfUsedSegments)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/count", dataSourceWithUsedSegments.getNumObjects())
);
}
);

// Emit coordinator runtime stats
emitDutyStats(emitter, "coordinator/time", stats, "runtime");

return params;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
import org.apache.druid.server.coordinator.duty.LogUsedSegments;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
Expand Down Expand Up @@ -1133,6 +1135,34 @@ public void unannounce(DruidNode node)
latch2.await();
}

@Test
public void testEmitClusterStatsAndMetricsAddedWhenNotInDutyList()
{
DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new DutiesRunnable(ImmutableList.of(new LogUsedSegments()), 0, "TEST");
List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
int emitDutyFound = 0;
for (CoordinatorDuty duty : duties) {
if (duty instanceof EmitClusterStatsAndMetrics) {
emitDutyFound++;
}
}
Assert.assertEquals(1, emitDutyFound);
}

@Test
public void testEmitClusterStatsAndMetricsNotAddedAgainWhenInDutyList()
{
DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new DutiesRunnable(ImmutableList.of(new LogUsedSegments(), new EmitClusterStatsAndMetrics(coordinator, "TEST", false)), 0, "TEST");
List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
int emitDutyFound = 0;
for (CoordinatorDuty duty : duties) {
if (duty instanceof EmitClusterStatsAndMetrics) {
emitDutyFound++;
}
}
Assert.assertEquals(1, emitDutyFound);
}

private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
Expand Down
Loading

0 comments on commit ea51d8a

Please sign in to comment.