Skip to content
This repository was archived by the owner on Jul 1, 2022. It is now read-only.

Log SenderExceptions the first time they occur in a row #729

Merged
merged 1 commit into from
Jul 31, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
import io.jaegertracing.internal.senders.SenderResolver;
import io.jaegertracing.spi.Reporter;
import io.jaegertracing.spi.Sender;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
@@ -166,17 +168,29 @@ public void execute() throws SenderException {
@ToString
class QueueProcessor implements Runnable {
private boolean open = true;
private final Set<Class<?>> commandFailedBefore = new HashSet<Class<?>>();

@Override
public void run() {
while (open) {
try {
RemoteReporter.Command command = commandQueue.take();
Class<? extends Command> commandClass = command.getClass();
boolean failedBefore = commandFailedBefore.contains(commandClass);

try {
command.execute();
if (failedBefore) {
log.info(commandClass.getSimpleName() + " is working again!");
commandFailedBefore.remove(commandClass);
}
} catch (SenderException e) {
metrics.reporterFailure.inc(e.getDroppedSpanCount());
if (!failedBefore) {
log.warn(commandClass.getSimpleName()
+ " execution failed! Repeated errors of this command will not be logged.", e);
commandFailedBefore.add(commandClass);
}
}
} catch (Exception e) {
log.error("QueueProcessor error:", e);
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -39,15 +41,21 @@
import io.jaegertracing.spi.Reporter;
import io.jaegertracing.spi.Sender;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.slf4j.LoggerFactory;

public class RemoteReporterTest {
@@ -215,15 +223,8 @@ public void testCloseWhenQueueFull() {
@Test
public void testCloseLogSenderException() throws SenderException {

// set up mocking
ch.qos.logback.classic.Logger root =
(ch.qos.logback.classic.Logger) LoggerFactory
.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);

@SuppressWarnings("unchecked") final Appender<ILoggingEvent> mockAppender =
mock(Appender.class);
when(mockAppender.getName()).thenReturn("MOCK");
root.addAppender(mockAppender);
Appender<ILoggingEvent> mockAppender = mockLogger(e -> {
});

final Sender mockedSender = mock(Sender.class);
when(mockedSender.close()).thenThrow(SenderException.class);
@@ -322,6 +323,74 @@ public int flush() throws SenderException {
assertEquals("mySpan", (sender.getReceived().get(0)).getOperationName());
}

@Test
public void testFlushErrorsLoggedJustOnce() throws InterruptedException {

Object logMonitor = new Object();
AtomicReference<String> logMsg = new AtomicReference<>(null);
mockLogger(e -> {
synchronized (logMonitor) {
logMsg.set(e.getFormattedMessage());
logMonitor.notifyAll();
}
});

class FailingSender extends InMemorySender {
private final AtomicInteger flushCounter = new AtomicInteger(0);

@Override
public int flush() throws SenderException {
int i = super.flush();
switch (flushCounter.getAndIncrement()) {
case 1:
case 2:
case 3:
throw new SenderException("test1", super.flush());
default:
return i;
}
}

private String awaitMessage(AtomicReference<String> ref) throws InterruptedException {
synchronized (logMonitor) {
while (ref.get() == null) {
logMonitor.wait();
}
return ref.getAndSet(null);
}
}
}

FailingSender sender = new FailingSender();

RemoteReporter remoteReporter = new Builder()
.withSender(sender)
.withFlushInterval(Integer.MAX_VALUE)
.withMaxQueueSize(maxQueueSize)
.withMetrics(metrics)
.build();
tracer = new JaegerTracer.Builder("test-remote-reporter")
.withReporter(remoteReporter)
.withSampler(new ConstSampler(true))
.withMetrics(metrics)
.build();

tracer.buildSpan("mySpan").start().finish();
remoteReporter.flush();

tracer.buildSpan("mySpan").start().finish();
remoteReporter.flush();

assertEquals("FlushCommand execution failed! Repeated errors of this command will not be logged.",
sender.awaitMessage(logMsg));

remoteReporter.flush();
remoteReporter.flush();
remoteReporter.flush();
assertEquals("FlushCommand is working again!", sender.awaitMessage(logMsg));

}

@Test
public void testUpdateSuccessMetricWhenAppendFlushed() throws InterruptedException {
int totalSpans = 3;
@@ -396,4 +465,21 @@ public int append(JaegerSpan span) throws SenderException {
private JaegerSpan newSpan() {
return tracer.buildSpan("x").start();
}

private static Appender<ILoggingEvent> mockLogger(Consumer<ILoggingEvent> append) {
ch.qos.logback.classic.Logger root =
(ch.qos.logback.classic.Logger) LoggerFactory
.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);

@SuppressWarnings("unchecked")
final Appender<ILoggingEvent> mockAppender = mock(Appender.class);
when(mockAppender.getName()).thenReturn("MOCK");
doAnswer(i -> {
append.accept(i.getArgument(0));
return null;
}).when(mockAppender).doAppend(ArgumentMatchers.any(ILoggingEvent.class));
root.addAppender(mockAppender);

return mockAppender;
}
}