Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow coordinator run auto compaction duty period to be configured separately from other indexing duties #12263

Merged
merged 13 commits into from
Feb 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/design/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ Compaction tasks might fail due to the following reasons.

Once a compaction task fails, the Coordinator simply checks the segments in the interval of the failed task again, and issues another compaction task in the next run.

Note that Compacting Segments Coordinator Duty is automatically enabled and run as part of the Indexing Service Duties group. However, Compacting Segments Coordinator Duty can be configured to run in isolation as a separate coordinator duty group. This allows changing the period of Compacting Segments Coordinator Duty without impacting the period of other Indexing Service Duties. This can be done by setting the following properties (for more details see [custom pluggable Coordinator Duty](../development/modules.md#adding-your-own-custom-pluggable-coordinator-duty)):
```
druid.coordinator.dutyGroups=[<SOME_GROUP_NAME>]
druid.coordinator.<SOME_GROUP_NAME>.duties=["compactSegments"]
druid.coordinator.<SOME_GROUP_NAME>.period=<PERIOD_TO_RUN_COMPACTING_SEGMENTS_DUTY>
```

### Segment search policy

#### Recent segment first policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -158,7 +159,7 @@ public class DruidCoordinator
private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector;

private final ObjectMapper objectMapper;
private final CompactSegments compactSegments;

private volatile boolean started = false;
Expand Down Expand Up @@ -194,7 +195,7 @@ public DruidCoordinator(
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
@Coordinator DruidLeaderSelector coordLeaderSelector,
CompactSegments compactSegments,
ObjectMapper objectMapper,
ZkEnablementConfig zkEnablementConfig
)
{
Expand All @@ -219,7 +220,7 @@ public DruidCoordinator(
factory,
lookupCoordinatorManager,
coordLeaderSelector,
compactSegments,
objectMapper,
zkEnablementConfig
);
}
Expand All @@ -245,7 +246,7 @@ public DruidCoordinator(
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector,
CompactSegments compactSegments,
ObjectMapper objectMapper,
ZkEnablementConfig zkEnablementConfig
)
{
Expand Down Expand Up @@ -276,8 +277,8 @@ public DruidCoordinator(
this.factory = factory;
this.lookupCoordinatorManager = lookupCoordinatorManager;
this.coordLeaderSelector = coordLeaderSelector;

this.compactSegments = compactSegments;
this.objectMapper = objectMapper;
this.compactSegments = initializeCompactSegmentsDuty();
}

public boolean isLeader()
Expand Down Expand Up @@ -769,14 +770,17 @@ private List<CoordinatorDuty> makeHistoricalManagementDuties()
);
}

private List<CoordinatorDuty> makeIndexingServiceDuties()
@VisibleForTesting
List<CoordinatorDuty> makeIndexingServiceDuties()
{
List<CoordinatorDuty> duties = new ArrayList<>();
duties.add(new LogUsedSegments());
duties.addAll(indexingServiceDuties);
// CompactSegmentsDuty should be the last duty as it can take a long time to complete
duties.addAll(makeCompactSegmentsDuty());

// We do not have to add compactSegments if it is already enabled in the custom duty group
if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
duties.addAll(makeCompactSegmentsDuty());
}
log.debug(
"Done making indexing service duties %s",
duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
Expand All @@ -797,6 +801,31 @@ private List<CoordinatorDuty> makeMetadataStoreManagementDuties()
return ImmutableList.copyOf(duties);
}

@VisibleForTesting
CompactSegments initializeCompactSegmentsDuty()
{
List<CompactSegments> compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
return new CompactSegments(config, objectMapper, indexingServiceClient);
} else {
if (compactSegmentsDutyFromCustomGroups.size() > 1) {
log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up.");
}
return compactSegmentsDutyFromCustomGroups.get(0);
}
}

@VisibleForTesting
List<CompactSegments> getCompactSegmentsDutyFromCustomGroups()
{
return customDutyGroups.getCoordinatorCustomDutyGroups()
.stream()
.flatMap(coordinatorCustomDutyGroup -> coordinatorCustomDutyGroup.getCustomDutyList().stream())
.filter(duty -> duty instanceof CompactSegments)
.map(duty -> (CompactSegments) duty)
.collect(Collectors.toList());
}

private List<CoordinatorDuty> makeCompactSegmentsDuty()
{
return ImmutableList.of(compactSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.server.coordinator.duty;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
Expand Down Expand Up @@ -56,7 +58,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

public class CompactSegments implements CoordinatorDuty
public class CompactSegments implements CoordinatorCustomDuty
{
static final String COMPACTION_TASK_COUNT = "compactTaskCount";
static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot";
Expand Down Expand Up @@ -90,10 +92,11 @@ public class CompactSegments implements CoordinatorDuty
private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();

@Inject
@JsonCreator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a serde test for this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public CompactSegments(
DruidCoordinatorConfig config,
ObjectMapper objectMapper,
IndexingServiceClient indexingServiceClient
@JacksonInject DruidCoordinatorConfig config,
@JacksonInject ObjectMapper objectMapper,
@JacksonInject IndexingServiceClient indexingServiceClient
)
{
this.policy = new NewestSegmentFirstPolicy(objectMapper);
Expand All @@ -104,6 +107,18 @@ public CompactSegments(
LOG.info("Scheduling compaction with skipLockedIntervals [%s]", skipLockedIntervals);
}

@VisibleForTesting
public boolean isSkipLockedIntervals()
{
return skipLockedIntervals;
}

@VisibleForTesting
IndexingServiceClient getIndexingServiceClient()
{
return indexingServiceClient;
}

@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class),
@JsonSubTypes.Type(name = "compactSegments", value = CompactSegments.class),
})
@ExtensionPoint
public interface CoordinatorCustomDuty extends CoordinatorDuty
Expand Down
Loading