Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handles failure in compaction planner #4586

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public enum CacheName {
COMPACTION_CONFIGS,
COMPACTION_DIR_CACHE,
COMPACTION_DISPATCHERS,
COMPACTION_SERVICE_UNKNOWN,
COMPACTOR_GROUP_ID,
COMPRESSION_ALGORITHM,
CRYPT_PASSWORDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.server.compaction;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -26,7 +27,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.PluginEnvironment;
Expand All @@ -35,6 +35,7 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.logging.ConditionalLogger;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
Expand All @@ -45,7 +46,6 @@
import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.spi.compaction.CompactionServices;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
Expand All @@ -55,18 +55,25 @@
import org.apache.accumulo.core.util.time.SteadyTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import com.github.benmanes.caffeine.cache.Cache;

public class CompactionJobGenerator {
private static final Logger log = LoggerFactory.getLogger(CompactionJobGenerator.class);
private static final Logger UNKNOWN_SERVICE_ERROR_LOG =
new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR);
private static final Logger PLANNING_INIT_ERROR_LOG =
new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR);
private static final Logger PLANNING_ERROR_LOG =
new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR);

private final CompactionServicesConfig servicesConfig;
private final Map<CompactionServiceId,CompactionPlanner> planners = new HashMap<>();
private final Cache<TableId,CompactionDispatcher> dispatchers;
private final Set<CompactionServiceId> serviceIds;
private final PluginEnvironment env;
private final Map<FateId,Map<String,String>> allExecutionHints;
private final Cache<Pair<TableId,CompactionServiceId>,Long> unknownCompactionServiceErrorCache;
private final SteadyTime steadyTime;

public CompactionJobGenerator(PluginEnvironment env,
Expand All @@ -87,22 +94,15 @@ public CompactionJobGenerator(PluginEnvironment env,
executionHints.forEach((k, v) -> allExecutionHints.put(k,
v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v)));
}
unknownCompactionServiceErrorCache =
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, false)
.expireAfterWrite(5, TimeUnit.MINUTES).build();

this.steadyTime = steadyTime;
}

public Collection<CompactionJob> generateJobs(TabletMetadata tablet, Set<CompactionKind> kinds) {

// ELASTICITY_TODO do not want user configured plugins to cause exceptions that prevents tablets
// from being
// assigned. So probably want to catch exceptions and log, but not too spammily OR some how
// report something
// back to the manager so it can log.

Collection<CompactionJob> systemJobs = Set.of();

log.debug("Planning for {} {} {}", tablet.getExtent(), kinds, this.hashCode());

if (kinds.contains(CompactionKind.SYSTEM)) {
CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, Map.of());
systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, Map.of());
Expand Down Expand Up @@ -166,19 +166,11 @@ private Collection<CompactionJob> planCompactions(CompactionServiceId serviceId,
CompactionKind kind, TabletMetadata tablet, Map<String,String> executionHints) {

if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
var cacheKey = new Pair<>(tablet.getTableId(), serviceId);
var last = unknownCompactionServiceErrorCache.getIfPresent(cacheKey);
if (last == null) {
// have not logged an error recently for this, so lets log one
log.error(
"Tablet {} returned non-existent compaction service {} for compaction type {}. Check"
+ " the table compaction dispatcher configuration. No compactions will happen"
+ " until the configuration is fixed. This log message is temporarily suppressed for the"
+ " entire table.",
tablet.getExtent(), serviceId, kind);
unknownCompactionServiceErrorCache.put(cacheKey, System.currentTimeMillis());
}

UNKNOWN_SERVICE_ERROR_LOG.trace(
"Table {} returned non-existent compaction service {} for compaction type {}. Check"
+ " the table compaction dispatcher configuration. No compactions will happen"
+ " until the configuration is fixed. This log message is temporarily suppressed.",
tablet.getExtent().tableId(), serviceId, kind);
return Set.of();
}

Expand Down Expand Up @@ -299,8 +291,7 @@ public CompactionPlan.Builder createPlanBuilder() {
return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates);
}
};

return planner.makePlan(params).getJobs();
return planCompactions(planner, params, serviceId);
}

private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId serviceId) {
Expand All @@ -317,11 +308,27 @@ private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId ser
servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env);
planner.init(initParameters);
} catch (Exception e) {
log.error(
"Failed to create compaction planner for {} using class:{} options:{}. Compaction service will not start any new compactions until its configuration is fixed.",
serviceId, plannerClassName, options, e);
PLANNING_INIT_ERROR_LOG.trace(
"Failed to create compaction planner for service:{} tableId:{} using class:{} options:{}. Compaction "
+ "service will not start any new compactions until its configuration is fixed. This log message is "
+ "temporarily suppressed.",
serviceId, tableId, plannerClassName, options, e);
planner = new ProvisionalCompactionPlanner(serviceId);
}
return planner;
}

private Collection<CompactionJob> planCompactions(CompactionPlanner planner,
CompactionPlanner.PlanningParameters params, CompactionServiceId serviceId) {
try {
return planner.makePlan(params).getJobs();
} catch (Exception e) {
PLANNING_ERROR_LOG.trace(
"Failed to plan compactions for service:{} kind:{} tableId:{} hints:{}. Compaction service may not start any"
+ " new compactions until this issue is resolved. Duplicates of this log message are temporarily"
+ " suppressed.",
serviceId, params.getKind(), params.getTableId(), params.getExecutionHints(), e);
return Set.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ public CompactionPlan makePlan(PlanningParameters params) {
}
}

public static class ErroringPlanner implements CompactionPlanner {
@Override
public void init(InitParameters params) {
if (Boolean.parseBoolean(params.getOptions().getOrDefault("failInInit", "false"))) {
throw new IllegalStateException("error initializing");
}

}

@Override
public CompactionPlan makePlan(PlanningParameters params) {
throw new IllegalStateException("error planning");
}
}

public static class CompactionExecutorITConfig implements MiniClusterConfigurationCallback {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) {
Expand All @@ -177,6 +192,16 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf)
cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11");
cfg.setProperty(csp + "cs4.planner.opts.process", "USER");

// Setup three planner that fail to initialize or plan, these planners should not impede
// tablet assignment.
cfg.setProperty(csp + "cse1.planner", ErroringPlanner.class.getName());
cfg.setProperty(csp + "cse1.planner.opts.failInInit", "true");

cfg.setProperty(csp + "cse2.planner", ErroringPlanner.class.getName());
cfg.setProperty(csp + "cse2.planner.opts.failInInit", "false");

cfg.setProperty(csp + "cse3.planner", "NonExistentPlanner20240522");

// this is meant to be dynamically reconfigured
cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName());
cfg.setProperty(csp + "recfg.planner.opts.groups", "[{'group':'i1'},{'group':'i2'}]");
Expand Down Expand Up @@ -230,6 +255,35 @@ public void cleanup() {
}
}

@Test
public void testFailingPlanners() throws Exception {
// This test ensures that a table w/ failing compaction planner can still be read and written.

try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
createTable(client, "fail1", "cse1");
createTable(client, "fail2", "cse2");
createTable(client, "fail3", "cse3");

// ensure tablets can still be assigned and written w/ failing compaction services
addFiles(client, "fail1", 30);
addFiles(client, "fail2", 30);
addFiles(client, "fail3", 30);

// ensure tablets can still be assigned and scanned w/ failing compaction services
assertEquals(30, scanTable(client, "fail1").size());
assertEquals(30, scanTable(client, "fail2").size());
assertEquals(30, scanTable(client, "fail3").size());

// compactions should never run on these tables, but sleep a bit to be sure
Thread.sleep(2000);

// do no expect any compactions to run
assertEquals(30, getFiles(client, "fail1").size());
assertEquals(30, getFiles(client, "fail2").size());
assertEquals(30, getFiles(client, "fail3").size());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, if you wanted to test that the ERROR logging is being emitted (that the escalation is happening), you can look at https://github.com/apache/accumulo/blob/main/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java. I was able to figure out how to programatically change the logger and capture the output in a Java object that you can then test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I have wanted to check logging in unit test in the past but did not know how. Looked around and there is currently no unit test for CompactionJobGenerator. Would need to add one to test logging, could be something to look into as a follow on.

@Test
public void testReconfigureCompactionService() throws Exception {
Stream.of("i1", "i2").forEach(g -> {
Expand Down