Skip to content

Commit

Permalink
Log message when Tablet has been unloading for over 15 minutes (#4558)
Browse files Browse the repository at this point in the history
Created an abstract ConditionalLogger class with two implementations. The
EscalatingLogger will conditionally log at a higher level and the
deduplicating logger will conditionally suppress log messages. Wired up the
deduplicating logger in the UnloadTabletHandler to suppress multiple invocations
of unload and wired up the escalating logger in the TabletGroupWatcher when
the same tablet has been requested to be unloaded.

Closes #4539
  • Loading branch information
dlmarion authored May 24, 2024
1 parent 5e48ff9 commit 221259e
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 5 deletions.
16 changes: 16 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,27 @@
<artifactId>hadoop-client-runtime</artifactId>
<scope>runtime</scope>
</dependency>
<!-- bnd dependency added due to lint issue, see https://github.com/apache/logging-log4j2/issues/2232 -->
<dependency>
<groupId>biz.aQute.bnd</groupId>
<artifactId>biz.aQute.bnd.annotation</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.logging;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;

import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.Marker;
import org.slf4j.event.Level;
import org.slf4j.helpers.AbstractLogger;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

/**
* Logger that wraps another Logger and only emits a log message once per the supplied duration.
*
*/
public abstract class ConditionalLogger extends AbstractLogger {

private static final long serialVersionUID = 1L;

/**
* A Logger implementation that will log a message at the supplied elevated level if it has not
* been seen in the supplied duration. For repeat occurrences the message will be logged at the
* level used in code (which is likely a lower level). Note that the first log message will be
* logged at the elevated level because it has not been seen before.
*/
public static class EscalatingLogger extends DeduplicatingLogger {

private static final long serialVersionUID = 1L;
private final Level elevatedLevel;

public EscalatingLogger(Logger log, Duration threshold, long maxCachedLogMessages,
Level elevatedLevel) {
super(log, threshold, maxCachedLogMessages);
this.elevatedLevel = elevatedLevel;
}

@Override
protected void handleNormalizedLoggingCall(Level level, Marker marker, String messagePattern,
Object[] arguments, Throwable throwable) {

if (arguments == null) {
arguments = new Object[0];
}
if (!condition.apply(messagePattern, Arrays.asList(arguments))) {
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern,
arguments);
} else {
delegate.atLevel(elevatedLevel).addMarker(marker).setCause(throwable).log(messagePattern,
arguments);
}

}

}

/**
* A Logger implementation that will suppress duplicate messages within the supplied duration.
*/
public static class DeduplicatingLogger extends ConditionalLogger {

private static final long serialVersionUID = 1L;

public DeduplicatingLogger(Logger log, Duration threshold, long maxCachedLogMessages) {
super(log, new BiFunction<>() {

private final Cache<Pair<String,List<Object>>,Boolean> cache = Caffeine.newBuilder()
.expireAfterWrite(threshold).maximumSize(maxCachedLogMessages).build();

private final ConcurrentMap<Pair<String,List<Object>>,Boolean> cacheMap = cache.asMap();

/**
* Function that will return true if the message has not been seen in the supplied duration.
*
* @param msg log message
* @param args log message arguments
* @return true if message has not been seen in duration, else false.
*/
@Override
public Boolean apply(String msg, List<Object> args) {
return cacheMap.putIfAbsent(new Pair<>(msg, args), true) == null;
}

});
}

}

protected final Logger delegate;
protected final BiFunction<String,List<Object>,Boolean> condition;

protected ConditionalLogger(Logger log, BiFunction<String,List<Object>,Boolean> condition) {
// this.delegate = new DelegateWrapper(log);
this.delegate = log;
this.condition = condition;
}

@Override
public boolean isTraceEnabled() {
return this.delegate.isTraceEnabled();
}

@Override
public boolean isTraceEnabled(Marker marker) {
return this.delegate.isTraceEnabled(marker);
}

@Override
public boolean isDebugEnabled() {
return this.delegate.isDebugEnabled();
}

@Override
public boolean isDebugEnabled(Marker marker) {
return this.delegate.isDebugEnabled(marker);
}

@Override
public boolean isInfoEnabled() {
return this.delegate.isInfoEnabled();
}

@Override
public boolean isInfoEnabled(Marker marker) {
return this.delegate.isInfoEnabled(marker);
}

@Override
public boolean isWarnEnabled() {
return this.delegate.isWarnEnabled();
}

@Override
public boolean isWarnEnabled(Marker marker) {
return this.delegate.isWarnEnabled(marker);
}

@Override
public boolean isErrorEnabled() {
return this.delegate.isErrorEnabled();
}

@Override
public boolean isErrorEnabled(Marker marker) {
return this.delegate.isErrorEnabled(marker);
}

@Override
public String getName() {
return this.delegate.getName();
}

@Override
protected String getFullyQualifiedCallerName() {
return this.delegate.getName();
}

@Override
protected void handleNormalizedLoggingCall(Level level, Marker marker, String messagePattern,
Object[] arguments, Throwable throwable) {

if (arguments == null) {
arguments = new Object[0];
}
if (condition.apply(messagePattern, Arrays.asList(arguments))) {
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern, arguments);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.logging;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.StringWriter;
import java.time.Duration;

import org.apache.accumulo.core.logging.ConditionalLogger.DeduplicatingLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.WriterAppender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeduplicatingLoggerTest {

private static final Logger LOG = LoggerFactory.getLogger(DeduplicatingLoggerTest.class);
private static final Logger TEST_LOGGER =
new DeduplicatingLogger(LOG, Duration.ofMinutes(1), 100);

@Test
public void test() {

StringWriter writer = new StringWriter();

// Programatically modify the Log4j2 Logging configuration to add an appender
LoggerContext ctx = LoggerContext.getContext(false);
Configuration cfg = ctx.getConfiguration();
PatternLayout layout = PatternLayout.createDefaultLayout(cfg);
Appender appender = WriterAppender.createAppender(layout, null, writer,
"DeduplicatingLoggerTestAppender", false, true);
appender.start();
cfg.addAppender(appender);
cfg.getLoggerConfig(DeduplicatingLoggerTest.class.getName()).addAppender(appender, null, null);

TEST_LOGGER.error("ERROR TEST");
TEST_LOGGER.warn("WARN TEST");
assertEquals(1, StringUtils.countMatches(writer.toString(), "ERROR TEST"));
assertEquals(1, StringUtils.countMatches(writer.toString(), "WARN TEST"));
TEST_LOGGER.error("ERROR TEST");
TEST_LOGGER.warn("WARN TEST");
assertEquals(1, StringUtils.countMatches(writer.toString(), "ERROR TEST"));
assertEquals(1, StringUtils.countMatches(writer.toString(), "WARN TEST"));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.logging;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.StringWriter;
import java.time.Duration;

import org.apache.accumulo.core.logging.ConditionalLogger.EscalatingLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.WriterAppender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class EscalatingLoggerTest {

private static final Logger LOG = LoggerFactory.getLogger(EscalatingLoggerTest.class);
private static final Logger TEST_LOGGER =
new EscalatingLogger(LOG, Duration.ofSeconds(3), 100, Level.WARN);

@Test
public void test() throws InterruptedException {

StringWriter writer = new StringWriter();

// Programatically modify the Log4j2 Logging configuration to add an appender
LoggerContext ctx = LoggerContext.getContext(false);
Configuration cfg = ctx.getConfiguration();
PatternLayout layout = PatternLayout.newBuilder().withConfiguration(cfg)
.withPattern(PatternLayout.SIMPLE_CONVERSION_PATTERN).build();
Appender appender = WriterAppender.createAppender(layout, null, writer,
"EscalatingLoggerTestAppender", false, true);
appender.start();
cfg.addAppender(appender);
cfg.getLoggerConfig(EscalatingLoggerTest.class.getName()).addAppender(appender, null, null);

TEST_LOGGER.info("TEST MESSAGE");
TEST_LOGGER.info("TEST MESSAGE");
TEST_LOGGER.info("TEST MESSAGE");
TEST_LOGGER.info("TEST MESSAGE");

assertEquals(1, StringUtils.countMatches(writer.toString(), "WARN"));
assertEquals(3, StringUtils.countMatches(writer.toString(), "INFO"));

Thread.sleep(5000);

TEST_LOGGER.info("TEST MESSAGE");

assertEquals(2, StringUtils.countMatches(writer.toString(), "WARN"));
assertEquals(3, StringUtils.countMatches(writer.toString(), "INFO"));

}

}
Loading

0 comments on commit 221259e

Please sign in to comment.