Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush reporter on shutdown #397

Merged
merged 1 commit into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.concurrent.Future;
Expand All @@ -49,6 +51,8 @@
*/
public class ApmServerReporter implements Reporter {

private static final Logger logger = LoggerFactory.getLogger(ApmServerReporter.class);

private static final EventTranslatorOneArg<ReportingEvent, Transaction> TRANSACTION_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, Transaction>() {
@Override
public void translateTo(ReportingEvent event, long sequence, Transaction t) {
Expand All @@ -73,6 +77,12 @@ public void translateTo(ReportingEvent event, long sequence, ErrorCapture error)
event.setError(error);
}
};
private static final EventTranslator<ReportingEvent> SHUTDOWN_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
@Override
public void translateTo(ReportingEvent event, long sequence) {
event.shutdownEvent();
}
};

private final Disruptor<ReportingEvent> disruptor;
private final AtomicLong dropped = new AtomicLong();
Expand Down Expand Up @@ -206,7 +216,12 @@ private boolean isEventProcessed(long sequence) {

@Override
public void close() {
disruptor.shutdown();
disruptor.publishEvent(SHUTDOWN_EVENT_TRANSLATOR);
try {
disruptor.shutdown(5, TimeUnit.SECONDS);
} catch (com.lmax.disruptor.TimeoutException e) {
logger.warn("Timeout while shutting down disruptor");
}
reportingEventHandler.close();
if (metricsReportingScheduler != null) {
metricsReportingScheduler.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class IntakeV2ReportingEventHandler implements ReportingEventHandler {
private TimerTask timeoutTask;
private int errorCount;
private long gracePeriodEnd;
private boolean shutDown;

public IntakeV2ReportingEventHandler(Service service, ProcessInfo process, SystemInfo system,
ReporterConfiguration reporterConfiguration, ProcessorEventHandler processorEventHandler,
Expand Down Expand Up @@ -142,7 +143,9 @@ public void onEvent(ReportingEvent event, long sequence, boolean endOfBatch) {
logger.debug("Receiving {} event (sequence {})", event.getType(), sequence);
}
try {
handleEvent(event, sequence, endOfBatch);
if (!shutDown) {
handleEvent(event, sequence, endOfBatch);
}
} finally {
event.resetState();
}
Expand All @@ -155,6 +158,10 @@ private void handleEvent(ReportingEvent event, long sequence, boolean endOfBatch
} else if (event.getType() == ReportingEvent.ReportingEventType.FLUSH) {
flush();
return;
} else if (event.getType() == ReportingEvent.ReportingEventType.SHUTDOWN) {
shutDown = true;
flush();
return;
}
processorEventHandler.onEvent(event, sequence, endOfBatch);
if (connection == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.ERROR;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.FLUSH;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.METRICS;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.SHUTDOWN;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.SPAN;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.TRANSACTION;

Expand Down Expand Up @@ -96,12 +97,16 @@ public void reportMetrics(MetricRegistry metricRegistry) {
this.type = METRICS;
}

public void shutdownEvent() {
this.type = SHUTDOWN;
}

@Nullable
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}

enum ReportingEventType {
FLUSH, TRANSACTION, SPAN, ERROR, METRICS
FLUSH, TRANSACTION, SPAN, ERROR, METRICS, SHUTDOWN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ void testReport() {
assertThat(ndJsonNodes.get(3).get("error")).isNotNull();
}

@Test
void testShutDown() {
reportTransaction();
sendShutdownEvent();
reportSpan();
reportingEventHandler.flush();

final List<JsonNode> ndJsonNodes = getNdJsonNodes();
assertThat(ndJsonNodes).hasSize(2);
assertThat(ndJsonNodes.get(0).get("metadata")).isNotNull();
assertThat(ndJsonNodes.get(1).get("transaction")).isNotNull();
}

@Test
void testReportRoundRobinOnServerError() {
mockApmServer1.stubFor(post(INTAKE_V2_URL).willReturn(serviceUnavailable()));
Expand Down Expand Up @@ -191,6 +204,12 @@ private void reportError() {
reportingEventHandler.onEvent(reportingEvent, -1, true);
}

private void sendShutdownEvent() {
final ReportingEvent reportingEvent = new ReportingEvent();
reportingEvent.shutdownEvent();
reportingEventHandler.onEvent(reportingEvent, -1, true);
}

private List<JsonNode> getNdJsonNodes() {
return Stream.of(mockApmServer1, mockApmServer2)
.flatMap(apmServer -> apmServer.findAll(postRequestedFor(urlEqualTo(INTAKE_V2_URL))).stream())
Expand Down