Skip to content

Commit

Permalink
code review update
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed May 29, 2024
1 parent 526cd66 commit 5500afb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ public enum CacheName {
COMPACTION_CONFIGS,
COMPACTION_DIR_CACHE,
COMPACTION_DISPATCHERS,
COMPACTION_SERVICE_UNKNOWN,
COMPACTION_PLANNER_INIT_FAILED,
COMPACTION_PLANNER_FAILED,
COMPACTOR_GROUP_ID,
COMPRESSION_ALGORITHM,
CRYPT_PASSWORDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.server.compaction;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -26,7 +27,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.PluginEnvironment;
Expand All @@ -35,6 +35,7 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.logging.ConditionalLogger;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
Expand All @@ -45,7 +46,6 @@
import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.spi.compaction.CompactionServices;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
Expand All @@ -61,15 +61,19 @@

public class CompactionJobGenerator {
private static final Logger log = LoggerFactory.getLogger(CompactionJobGenerator.class);
private static final Logger UNKNOWN_SERVICE_ERROR_LOG =
new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR);
private static final Logger PLANNING_INIT_ERROR_LOG =
new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR);
private static final Logger PLANNING_ERROR_LOG =
new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR);

private final CompactionServicesConfig servicesConfig;
private final Map<CompactionServiceId,CompactionPlanner> planners = new HashMap<>();
private final Cache<TableId,CompactionDispatcher> dispatchers;
private final Set<CompactionServiceId> serviceIds;
private final PluginEnvironment env;
private final Map<FateId,Map<String,String>> allExecutionHints;
private final Cache<Pair<TableId,CompactionServiceId>,Long> unknownCompactionServiceErrorCache;
private final Cache<Pair<TableId,CompactionServiceId>,Long> plannerInitErrorCache;
private final Cache<Pair<TableId,CompactionServiceId>,Long> planningErrorCache;
private final SteadyTime steadyTime;

public CompactionJobGenerator(PluginEnvironment env,
Expand All @@ -90,15 +94,7 @@ public CompactionJobGenerator(PluginEnvironment env,
executionHints.forEach((k, v) -> allExecutionHints.put(k,
v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v)));
}
unknownCompactionServiceErrorCache =
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, false)
.expireAfterWrite(5, TimeUnit.MINUTES).build();
plannerInitErrorCache =
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_PLANNER_INIT_FAILED, false)
.expireAfterWrite(5, TimeUnit.MINUTES).build();
planningErrorCache =
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_PLANNER_FAILED, false)
.expireAfterWrite(5, TimeUnit.MINUTES).build();

this.steadyTime = steadyTime;
}

Expand Down Expand Up @@ -166,29 +162,15 @@ public Map<String,String> getExecutionHints() {
return dispatcher.dispatch(dispatchParams).getService();
}

private Level getLevel(TableId tableId, CompactionServiceId serviceId,
Cache<Pair<TableId,CompactionServiceId>,Long> lastErrorCache) {
var cacheKey = new Pair<>(tableId, serviceId);
var last = lastErrorCache.getIfPresent(cacheKey);
if (last == null) {
lastErrorCache.put(cacheKey, System.currentTimeMillis());
return Level.ERROR;
} else {
return Level.TRACE;
}
}

private Collection<CompactionJob> planCompactions(CompactionServiceId serviceId,
CompactionKind kind, TabletMetadata tablet, Map<String,String> executionHints) {

if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
Level level = getLevel(tablet.getTableId(), serviceId, unknownCompactionServiceErrorCache);
log.atLevel(level).log(
"Tablet {} returned non-existent compaction service {} for compaction type {}. Check"
UNKNOWN_SERVICE_ERROR_LOG.trace(
"Table {} returned non-existent compaction service {} for compaction type {}. Check"
+ " the table compaction dispatcher configuration. No compactions will happen"
+ " until the configuration is fixed. This log message is temporarily suppressed for the"
+ " entire table.",
tablet.getExtent(), serviceId, kind);
+ " until the configuration is fixed. This log message is temporarily suppressed.",
tablet.getExtent().tableId(), serviceId, kind);
return Set.of();
}

Expand Down Expand Up @@ -326,12 +308,11 @@ private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId ser
servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env);
planner.init(initParameters);
} catch (Exception e) {
Level level = getLevel(tableId, serviceId, plannerInitErrorCache);
log.atLevel(level).setCause(e).log(
"Failed to create compaction planner for {} using class:{} options:{}. Compaction service will not "
+ "start any new compactions until its configuration is fixed. This log message is temporarily "
+ "suppressed for the entire table {}.",
serviceId, plannerClassName, options, tableId);
PLANNING_INIT_ERROR_LOG.trace(
"Failed to create compaction planner for service:{} tableId:{} using class:{} options:{}. Compaction "
+ "service will not start any new compactions until its configuration is fixed. This log message is "
+ "temporarily suppressed.",
serviceId, tableId, plannerClassName, options, e);
planner = new ProvisionalCompactionPlanner(serviceId);
}
return planner;
Expand All @@ -342,11 +323,11 @@ private Collection<CompactionJob> planCompactions(CompactionPlanner planner,
try {
return planner.makePlan(params).getJobs();
} catch (Exception e) {
Level level = getLevel(params.getTableId(), serviceId, planningErrorCache);
log.atLevel(level).setCause(e).log(
"Failed to plan compactions for {} {} hints:{}. Compaction service may not start any new compactions"
+ " until this issue is resolved. This log message is temporarily suppressed for the entire table. {}",
serviceId, params.getKind(), params.getExecutionHints(), params.getTableId());
PLANNING_ERROR_LOG.trace(
"Failed to plan compactions for service:{} kind:{} tableId:{} hints:{}. Compaction service may not start any"
+ " new compactions until this issue is resolved. Duplicates of this log message are temporarily"
+ " suppressed.",
serviceId, params.getKind(), params.getTableId(), params.getExecutionHints(), e);
return Set.of();
}
}
Expand Down

0 comments on commit 5500afb

Please sign in to comment.