-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run compaction as a supervisor on Overlord #16768
Changes from 21 commits
d89802a
bea1078
9455547
26d4bdc
caefb27
bbc2eb6
8be913e
23899b8
9c06504
f972ea2
fdcfdf1
4b81779
4942a08
b2ae560
ad98383
2e676be
e793f44
8ece3f0
2f152e2
382bfdf
e111d71
93284b3
273822b
117b0ab
0a584d4
bfc9ceb
ab711ed
e3012f3
bca9810
6d84927
b439935
517f9ad
eceb767
9d04d9b
3d2e58f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
|
||
package org.apache.druid.guice; | ||
|
||
import com.fasterxml.jackson.databind.Module; | ||
import com.fasterxml.jackson.databind.jsontype.NamedType; | ||
import com.fasterxml.jackson.databind.module.SimpleModule; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.inject.Binder; | ||
import org.apache.druid.indexing.compact.CompactionSupervisorSpec; | ||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; | ||
import org.apache.druid.initialization.DruidModule; | ||
|
||
import java.util.List; | ||
|
||
public class SupervisorModule implements DruidModule | ||
{ | ||
@Override | ||
public void configure(Binder binder) | ||
{ | ||
JsonConfigProvider.bind(binder, "druid.supervisor", SupervisorStateManagerConfig.class); | ||
} | ||
|
||
@Override | ||
public List<? extends Module> getJacksonModules() | ||
{ | ||
return ImmutableList.of( | ||
new SimpleModule(getClass().getSimpleName()) | ||
.registerSubtypes( | ||
new NamedType(CompactionSupervisorSpec.class, CompactionSupervisorSpec.TYPE) | ||
) | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.compact; | ||
|
||
import org.apache.druid.server.compaction.CompactionSimulateResult; | ||
import org.apache.druid.server.coordinator.AutoCompactionSnapshot; | ||
import org.apache.druid.server.coordinator.ClusterCompactionConfig; | ||
import org.apache.druid.server.coordinator.CompactionSupervisorsConfig; | ||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* Compaction scheduler that runs on the Overlord if {@link CompactionSupervisorsConfig} | ||
* is enabled. | ||
* <p> | ||
* Usage: | ||
* <ul> | ||
* <li>When an active {@link CompactionSupervisor} starts, it should register | ||
* itself by calling {@link #startCompaction}.</li> | ||
* <li>When a suspended {@link CompactionSupervisor} starts, it should stop | ||
* compaction by calling {@link #stopCompaction}.</li> | ||
* <li>When stopping, any {@link CompactionSupervisor} (active or suspended) | ||
* should call {@link #stopCompaction}.</li> | ||
* </ul> | ||
*/ | ||
public interface CompactionScheduler | ||
kfaraz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
void start(); | ||
|
||
void stop(); | ||
|
||
boolean isRunning(); | ||
|
||
void startCompaction(String dataSourceName, DataSourceCompactionConfig compactionConfig); | ||
|
||
void stopCompaction(String dataSourceName); | ||
|
||
Map<String, AutoCompactionSnapshot> getAllCompactionSnapshots(); | ||
|
||
AutoCompactionSnapshot getCompactionSnapshot(String dataSource); | ||
|
||
CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig updateRequest); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.compact; | ||
|
||
import org.apache.druid.indexing.overlord.DataSourceMetadata; | ||
import org.apache.druid.indexing.overlord.supervisor.Supervisor; | ||
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; | ||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; | ||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; | ||
import org.apache.druid.java.util.common.DateTimes; | ||
import org.apache.druid.java.util.common.logger.Logger; | ||
import org.apache.druid.server.coordinator.AutoCompactionSnapshot; | ||
|
||
/** | ||
* Supervisor for compaction of a single datasource. | ||
*/ | ||
public class CompactionSupervisor implements Supervisor | ||
{ | ||
private static final Logger log = new Logger(CompactionSupervisor.class); | ||
|
||
private final String dataSource; | ||
private final CompactionScheduler scheduler; | ||
private final CompactionSupervisorSpec supervisorSpec; | ||
|
||
public CompactionSupervisor( | ||
CompactionSupervisorSpec supervisorSpec, | ||
CompactionScheduler scheduler | ||
) | ||
{ | ||
this.supervisorSpec = supervisorSpec; | ||
this.scheduler = scheduler; | ||
this.dataSource = supervisorSpec.getSpec().getDataSource(); | ||
} | ||
|
||
@Override | ||
public void start() | ||
{ | ||
if (supervisorSpec.isSuspended()) { | ||
log.info("Suspending compaction for dataSource[%s].", dataSource); | ||
scheduler.stopCompaction(dataSource); | ||
Comment on lines
+55
to
+57
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When is this case encountered, i.e. the supervisor for the datasource is starting but the compaction for the same datasource needs to be stopped? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because of the way the supervisor framework works. This code is triggered when |
||
} else { | ||
log.info("Starting compaction for dataSource[%s].", dataSource); | ||
scheduler.startCompaction(dataSource, supervisorSpec.getSpec()); | ||
} | ||
} | ||
|
||
@Override | ||
public void stop(boolean stopGracefully) | ||
{ | ||
log.info("Stopping compaction for dataSource[%s].", dataSource); | ||
scheduler.stopCompaction(dataSource); | ||
} | ||
|
||
@Override | ||
public SupervisorReport<AutoCompactionSnapshot> getStatus() | ||
{ | ||
final AutoCompactionSnapshot snapshot; | ||
if (supervisorSpec.isSuspended()) { | ||
snapshot = AutoCompactionSnapshot.builder(dataSource) | ||
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED) | ||
.build(); | ||
} else { | ||
snapshot = scheduler.getCompactionSnapshot(dataSource); | ||
} | ||
|
||
return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), snapshot); | ||
} | ||
|
||
@Override | ||
public SupervisorStateManager.State getState() | ||
{ | ||
if (!scheduler.isRunning()) { | ||
return State.SCHEDULER_STOPPED; | ||
} else if (supervisorSpec.isSuspended()) { | ||
return State.SUSPENDED; | ||
} else { | ||
abhishekagarwal87 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return State.RUNNING; | ||
} | ||
} | ||
|
||
// Un-implemented methods used only by streaming supervisors | ||
|
||
@Override | ||
public void reset(DataSourceMetadata dataSourceMetadata) | ||
{ | ||
throw new UnsupportedOperationException("Resetting not supported for 'autocompact' supervisors."); | ||
} | ||
|
||
@Override | ||
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) | ||
{ | ||
throw new UnsupportedOperationException("Resetting offsets not supported for 'autocompact' supervisors."); | ||
} | ||
|
||
@Override | ||
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) | ||
{ | ||
throw new UnsupportedOperationException("Checkpointing not supported for 'autocompact' supervisors."); | ||
} | ||
|
||
@Override | ||
public LagStats computeLagStats() | ||
{ | ||
throw new UnsupportedOperationException("Lag stats not supported for 'autocompact' supervisors."); | ||
} | ||
|
||
@Override | ||
public int getActiveTaskGroupsCount() | ||
{ | ||
throw new UnsupportedOperationException("Task groups not supported for 'autocompact' supervisors."); | ||
} | ||
|
||
public enum State implements SupervisorStateManager.State | ||
|
||
{ | ||
SCHEDULER_STOPPED(true), | ||
RUNNING(true), | ||
SUSPENDED(true), | ||
UNHEALTHY(false); | ||
|
||
private final boolean healthy; | ||
|
||
State(boolean healthy) | ||
{ | ||
this.healthy = healthy; | ||
} | ||
|
||
@Override | ||
public boolean isFirstRunOnly() | ||
{ | ||
return false; | ||
} | ||
|
||
@Override | ||
public boolean isHealthy() | ||
{ | ||
return healthy; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I had intended to use this interface in
DruidCoordinator
too but that seemed unnecessary.Right now, the purpose of this interface is to hide implementation details in the
OverlordCompactionScheduler
and cleaner guice bindings. It can also be useful for unit testing classes that depend on theOverlordCompactionScheduler
.Let me know if it seems unnecessary.