Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duties in Indexing group (such as Auto Compaction) does not report metrics #12352

Merged
merged 5 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public class DruidCoordinator
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;

private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";
Expand Down Expand Up @@ -765,8 +765,7 @@ private List<CoordinatorDuty> makeHistoricalManagementDuties()
new RunRules(DruidCoordinator.this),
new UnloadUnusedSegments(),
new MarkAsUnusedOvershadowedSegments(DruidCoordinator.this),
new BalanceSegments(DruidCoordinator.this),
new EmitClusterStatsAndMetrics(DruidCoordinator.this)
new BalanceSegments(DruidCoordinator.this)
);
}

Expand Down Expand Up @@ -841,7 +840,17 @@ protected class DutiesRunnable implements Runnable

protected DutiesRunnable(List<? extends CoordinatorDuty> duties, final int startingLeaderCounter, String alias)
{
this.duties = duties;
// Automatically add EmitClusterStatsAndMetrics duty to the group if it does not already exists
// This is to avoid human coding error (forgetting to add the EmitClusterStatsAndMetrics duty to the group)
// causing metrics from the duties to not being emitted.
if (duties.stream().noneMatch(duty -> duty instanceof EmitClusterStatsAndMetrics)) {
boolean isContainCompactSegmentDuty = duties.stream().anyMatch(duty -> duty instanceof CompactSegments);
List<CoordinatorDuty> allDuties = new ArrayList<>(duties);
allDuties.add(new EmitClusterStatsAndMetrics(DruidCoordinator.this, alias, isContainCompactSegmentDuty));
this.duties = allDuties;
} else {
this.duties = duties;
}
this.startingLeaderCounter = startingLeaderCounter;
this.dutiesRunnableAlias = alias;
}
Expand Down Expand Up @@ -958,6 +967,12 @@ public void run()
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
}
}

@VisibleForTesting
public List<? extends CoordinatorDuty> getDuties()
{
return duties;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
public static final String MAX_REPLICATION_FACTOR = "maxReplicationFactor";

private final DruidCoordinator coordinator;
private final String groupName;
private final boolean isContainCompactSegmentDuty;

public EmitClusterStatsAndMetrics(DruidCoordinator coordinator)
public EmitClusterStatsAndMetrics(DruidCoordinator coordinator, String groupName, boolean isContainCompactSegmentDuty)
{
this.coordinator = coordinator;
this.groupName = groupName;
this.isContainCompactSegmentDuty = isContainCompactSegmentDuty;
}

private void emitTieredStat(
Expand All @@ -64,6 +68,7 @@ private void emitTieredStat(
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
Expand All @@ -78,6 +83,7 @@ private void emitTieredStat(
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
Expand Down Expand Up @@ -107,6 +113,7 @@ private void emitDutyStat(
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY, duty)
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(metricName, value)
);
}
Expand All @@ -133,6 +140,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
CoordinatorStats stats = params.getCoordinatorStats();
ServiceEmitter emitter = params.getEmitter();

if (DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP.equals(groupName)) {
emitStatsForHistoricalManagementDuties(cluster, stats, emitter, params);
}
if (isContainCompactSegmentDuty) {
emitStatsForCompactSegments(cluster, stats, emitter);
}

// Emit coordinator runtime stats
emitDutyStats(emitter, "coordinator/time", stats, "runtime");

return params;
}

private void emitStatsForHistoricalManagementDuties(DruidCluster cluster, CoordinatorStats stats, ServiceEmitter emitter, DruidCoordinatorRuntimeParams params)
{
stats.forEachTieredStat(
"assignedCount",
(final String tier, final long count) -> {
Expand Down Expand Up @@ -190,7 +212,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
);

emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"segment/overShadowed/count",
stats.getGlobalStat("overShadowedCount")
)
Expand Down Expand Up @@ -269,24 +293,28 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
.forEach((final String serverName, final LoadQueuePeon queuePeon) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/size", queuePeon.getLoadQueueSize()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/count", queuePeon.getSegmentsToLoad().size()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/dropQueue/count", queuePeon.getSegmentsToDrop().size()
)
Expand All @@ -299,6 +327,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final int numUnavailableUsedSegmentsInDataSource = entry.getIntValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/unavailable/count", numUnavailableUsedSegmentsInDataSource
)
Expand All @@ -314,6 +343,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/underReplicated/count", underReplicationCount
Expand All @@ -323,22 +353,54 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}
);

// Emit segment metrics
params.getUsedSegmentsTimelinesPerDataSource().forEach(
(String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> {
long totalSizeOfUsedSegments = dataSourceWithUsedSegments
.iterateAllObjects()
.stream()
.mapToLong(DataSegment::getSize)
.sum();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/size", totalSizeOfUsedSegments)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/count", dataSourceWithUsedSegments.getNumObjects())
);
}
);
}

private void emitStatsForCompactSegments(DruidCluster cluster, CoordinatorStats stats, ServiceEmitter emitter)
{
emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"compact/task/count",
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
)
);

emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"compactTask/maxSlot/count",
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
)
);

emitter.emit(
new ServiceMetricEvent.Builder().build(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.build(
"compactTask/availableSlot/count",
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
)
Expand All @@ -349,6 +411,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/waitCompact/bytes", count)
);
Expand All @@ -360,6 +423,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/waitCompact/count", count)
);
Expand All @@ -371,6 +435,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/waitCompact/count", count)
);
Expand All @@ -382,6 +447,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/bytes", count)
);
Expand All @@ -393,6 +459,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/count", count)
);
Expand All @@ -404,6 +471,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/skipCompact/count", count)
);
Expand All @@ -415,6 +483,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/bytes", count)
);
Expand All @@ -426,6 +495,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/count", count)
);
Expand All @@ -437,36 +507,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DUTY_GROUP, groupName)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/compacted/count", count)
);
}
);

// Emit segment metrics
params.getUsedSegmentsTimelinesPerDataSource().forEach(
(String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> {
long totalSizeOfUsedSegments = dataSourceWithUsedSegments
.iterateAllObjects()
.stream()
.mapToLong(DataSegment::getSize)
.sum();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/size", totalSizeOfUsedSegments)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/count", dataSourceWithUsedSegments.getNumObjects())
);
}
);

// Emit coordinator runtime stats
emitDutyStats(emitter, "coordinator/time", stats, "runtime");

return params;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
import org.apache.druid.server.coordinator.duty.LogUsedSegments;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
Expand Down Expand Up @@ -1133,6 +1135,34 @@ public void unannounce(DruidNode node)
latch2.await();
}

@Test
public void testEmitClusterStatsAndMetricsAddedWhenNotInDutyList()
{
DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new DutiesRunnable(ImmutableList.of(new LogUsedSegments()), 0, "TEST");
List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
int emitDutyFound = 0;
for (CoordinatorDuty duty : duties) {
if (duty instanceof EmitClusterStatsAndMetrics) {
emitDutyFound++;
}
}
Assert.assertEquals(1, emitDutyFound);
}

@Test
public void testEmitClusterStatsAndMetricsNotAddedAgainWhenInDutyList()
{
DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new DutiesRunnable(ImmutableList.of(new LogUsedSegments(), new EmitClusterStatsAndMetrics(coordinator, "TEST", false)), 0, "TEST");
List<? extends CoordinatorDuty> duties = dutyRunnable.getDuties();
int emitDutyFound = 0;
for (CoordinatorDuty duty : duties) {
if (duty instanceof EmitClusterStatsAndMetrics) {
emitDutyFound++;
}
}
Assert.assertEquals(1, emitDutyFound);
}

private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
Expand Down
Loading