Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handles failure in compaction planner #4586

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public enum CacheName {
COMPACTION_DIR_CACHE,
COMPACTION_DISPATCHERS,
COMPACTION_SERVICE_UNKNOWN,
COMPACTION_PLANNER_INIT_FAILED,
COMPACTION_PLANNER_FAILED,
COMPACTOR_GROUP_ID,
COMPRESSION_ALGORITHM,
CRYPT_PASSWORDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.accumulo.core.util.time.SteadyTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import com.github.benmanes.caffeine.cache.Cache;

Expand All @@ -67,6 +68,8 @@ public class CompactionJobGenerator {
private final PluginEnvironment env;
private final Map<FateId,Map<String,String>> allExecutionHints;
private final Cache<Pair<TableId,CompactionServiceId>,Long> unknownCompactionServiceErrorCache;
private final Cache<Pair<TableId,CompactionServiceId>,Long> plannerInitErrorCache;
private final Cache<Pair<TableId,CompactionServiceId>,Long> planningErrorCache;
private final SteadyTime steadyTime;

public CompactionJobGenerator(PluginEnvironment env,
Expand All @@ -90,19 +93,20 @@ public CompactionJobGenerator(PluginEnvironment env,
unknownCompactionServiceErrorCache =
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, false)
.expireAfterWrite(5, TimeUnit.MINUTES).build();
plannerInitErrorCache =
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_PLANNER_INIT_FAILED, false)
.expireAfterWrite(5, TimeUnit.MINUTES).build();
planningErrorCache =
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_PLANNER_FAILED, false)
.expireAfterWrite(5, TimeUnit.MINUTES).build();
this.steadyTime = steadyTime;
}

public Collection<CompactionJob> generateJobs(TabletMetadata tablet, Set<CompactionKind> kinds) {

// ELASTICITY_TODO do not want user configured plugins to cause exceptions that prevents tablets
// from being
// assigned. So probably want to catch exceptions and log, but not too spammily OR some how
// report something
// back to the manager so it can log.

Collection<CompactionJob> systemJobs = Set.of();

log.debug("Planning for {} {} {}", tablet.getExtent(), kinds, this.hashCode());

if (kinds.contains(CompactionKind.SYSTEM)) {
CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, Map.of());
systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, Map.of());
Expand Down Expand Up @@ -162,23 +166,29 @@ public Map<String,String> getExecutionHints() {
return dispatcher.dispatch(dispatchParams).getService();
}

private Level getLevel(TableId tableId, CompactionServiceId serviceId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if you can use the EscalatingLogger ( or some other variant of ConditionalLogger) that's included in #4558 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use EscalatingLogger in 5500afb. Ran the new IT in this PR and looked at the logs and the changes are working. I was wondering if the exception some of the log messages had would be included in the key created from arguments in ConditionalLogger. Poked around a bit and found its not included.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think the exception needs to be in the Key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there is a downside to having too many ConditionalLoggers, as there is a Caffeine cache for each one. I wonder if it would be better to have one and add the logger name to the key.

Cache<Pair<TableId,CompactionServiceId>,Long> lastErrorCache) {
var cacheKey = new Pair<>(tableId, serviceId);
var last = lastErrorCache.getIfPresent(cacheKey);
if (last == null) {
lastErrorCache.put(cacheKey, System.currentTimeMillis());
return Level.ERROR;
} else {
return Level.TRACE;
}
}

private Collection<CompactionJob> planCompactions(CompactionServiceId serviceId,
CompactionKind kind, TabletMetadata tablet, Map<String,String> executionHints) {

if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
var cacheKey = new Pair<>(tablet.getTableId(), serviceId);
var last = unknownCompactionServiceErrorCache.getIfPresent(cacheKey);
if (last == null) {
// have not logged an error recently for this, so lets log one
log.error(
"Tablet {} returned non-existent compaction service {} for compaction type {}. Check"
+ " the table compaction dispatcher configuration. No compactions will happen"
+ " until the configuration is fixed. This log message is temporarily suppressed for the"
+ " entire table.",
tablet.getExtent(), serviceId, kind);
unknownCompactionServiceErrorCache.put(cacheKey, System.currentTimeMillis());
}

Level level = getLevel(tablet.getTableId(), serviceId, unknownCompactionServiceErrorCache);
log.atLevel(level).log(
"Tablet {} returned non-existent compaction service {} for compaction type {}. Check"
+ " the table compaction dispatcher configuration. No compactions will happen"
+ " until the configuration is fixed. This log message is temporarily suppressed for the"
+ " entire table.",
tablet.getExtent(), serviceId, kind);
return Set.of();
}

Expand Down Expand Up @@ -299,8 +309,7 @@ public CompactionPlan.Builder createPlanBuilder() {
return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates);
}
};

return planner.makePlan(params).getJobs();
return planCompactions(planner, params, serviceId);
}

private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId serviceId) {
Expand All @@ -317,11 +326,28 @@ private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId ser
servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env);
planner.init(initParameters);
} catch (Exception e) {
log.error(
"Failed to create compaction planner for {} using class:{} options:{}. Compaction service will not start any new compactions until its configuration is fixed.",
serviceId, plannerClassName, options, e);
Level level = getLevel(tableId, serviceId, plannerInitErrorCache);
log.atLevel(level).setCause(e).log(
"Failed to create compaction planner for {} using class:{} options:{}. Compaction service will not "
+ "start any new compactions until its configuration is fixed. This log message is temporarily "
+ "suppressed for the entire table {}.",
serviceId, plannerClassName, options, tableId);
planner = new ProvisionalCompactionPlanner(serviceId);
}
return planner;
}

private Collection<CompactionJob> planCompactions(CompactionPlanner planner,
CompactionPlanner.PlanningParameters params, CompactionServiceId serviceId) {
try {
return planner.makePlan(params).getJobs();
} catch (Exception e) {
Level level = getLevel(params.getTableId(), serviceId, planningErrorCache);
log.atLevel(level).setCause(e).log(
"Failed to plan compactions for {} {} hints:{}. Compaction service may not start any new compactions"
+ " until this issue is resolved. This log message is temporarily suppressed for the entire table. {}",
serviceId, params.getKind(), params.getExecutionHints(), params.getTableId());
return Set.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ public CompactionPlan makePlan(PlanningParameters params) {
}
}

public static class ErroringPlanner implements CompactionPlanner {
@Override
public void init(InitParameters params) {
if (Boolean.parseBoolean(params.getOptions().getOrDefault("failInInit", "false"))) {
throw new IllegalStateException("error initializing");
}

}

@Override
public CompactionPlan makePlan(PlanningParameters params) {
throw new IllegalStateException("error planning");
}
}

public static class CompactionExecutorITConfig implements MiniClusterConfigurationCallback {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) {
Expand All @@ -177,6 +192,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf)
cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11");
cfg.setProperty(csp + "cs4.planner.opts.process", "USER");

// Setup three planner that fail to initialize or plan, these planners should not impede
// tablet assignment.
cfg.setProperty(csp + "cse1.planner", ErroringPlanner.class.getName());
cfg.setProperty(csp + "cse1.planner.opts.failInInit", "true");

cfg.setProperty(csp + "cse2.planner", ErroringPlanner.class.getName());
cfg.setProperty(csp + "cse2.planner.opts.failInInit", "false");

cfg.setProperty(csp + "cse3.planner", "NonExistentPlanner20240522");

// this is meant to be dynamically reconfigured
cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName());
cfg.setProperty(csp + "recfg.planner.opts.groups", "[{'group':'i1'},{'group':'i2'}]");
Expand Down Expand Up @@ -230,6 +255,35 @@ public void cleanup() {
}
}

@Test
public void testFailingPlanners() throws Exception {
// This test ensures that a table w/ failing compaction planner can still be read and written.

try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
createTable(client, "fail1", "cse1");
createTable(client, "fail2", "cse2");
createTable(client, "fail3", "cse3");

// ensure tablets can still be assigned and written w/ failing compaction services
addFiles(client, "fail1", 30);
addFiles(client, "fail2", 30);
addFiles(client, "fail3", 30);

// ensure tablets can still be assigned and scanned w/ failing compaction services
assertEquals(30, scanTable(client, "fail1").size());
assertEquals(30, scanTable(client, "fail2").size());
assertEquals(30, scanTable(client, "fail3").size());

// compactions should never run on these tables, but sleep a bit to be sure
Thread.sleep(2000);

// do no expect any compactions to run
assertEquals(30, getFiles(client, "fail1").size());
assertEquals(30, getFiles(client, "fail2").size());
assertEquals(30, getFiles(client, "fail3").size());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, if you wanted to test that the ERROR logging is being emitted (that the escalation is happening), you can look at https://github.com/apache/accumulo/blob/main/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java. I was able to figure out how to programatically change the logger and capture the output in a Java object that you can then test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I have wanted to check logging in unit test in the past but did not know how. Looked around and there is currently no unit test for CompactionJobGenerator. Would need to add one to test logging, could be something to look into as a follow on.

@Test
public void testReconfigureCompactionService() throws Exception {
Stream.of("i1", "i2").forEach(g -> {
Expand Down