Skip to content

Commit

Permalink
GH-1014: Add addMdcAsHeaders into appenders
Browse files Browse the repository at this point in the history
Fixes #1014

**Cherry-pick to 2.1.x, 2.0.x & 1.7.x**

GH-1014 minor changes

GH-1014 renamed property to addMdcAsHeaders

GH-1014 added addMdcAsHeaders into documentation

GH-1014 added addMdcAsHeaders into logback appender. added integration test

GH-1014 updated documentation

GH-1014 minor fix

GH-1014 updated documentation

GH-1014 minor fix

GH-1014 removed this prefix

* Made addMdcAsHeaders true by default

* Polishing
  • Loading branch information
egusev authored and artembilan committed Jun 13, 2019
1 parent b6f6b60 commit 925b8c6
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
* @author Artem Bilan
* @author Dominique Villard
* @author Nicolas Ristock
* @author Eugene Gusev
*
* @since 1.6
*/
Expand Down Expand Up @@ -183,7 +184,9 @@ public static AmqpAppender createAppender(// NOSONAR NCSS line count
@PluginAttribute("async") boolean async,
@PluginAttribute("charset") String charset,
@PluginAttribute(value = "bufferSize", defaultInt = Integer.MAX_VALUE) int bufferSize,
@PluginElement(BlockingQueueFactory.ELEMENT_TYPE) BlockingQueueFactory<Event> blockingQueueFactory) {
@PluginElement(BlockingQueueFactory.ELEMENT_TYPE) BlockingQueueFactory<Event> blockingQueueFactory,
@PluginAttribute(value = "addMdcAsHeaders", defaultBoolean = true) boolean addMdcAsHeaders) {

if (name == null) {
LOGGER.error("No name for AmqpAppender");
}
Expand Down Expand Up @@ -226,6 +229,7 @@ public static AmqpAppender createAppender(// NOSONAR NCSS line count
manager.clientConnectionProperties = clientConnectionProperties;
manager.charset = charset;
manager.async = async;
manager.addMdcAsHeaders = addMdcAsHeaders;

BlockingQueue<Event> eventQueue;
if (blockingQueueFactory == null) {
Expand Down Expand Up @@ -283,7 +287,7 @@ protected Message postProcessMessageBeforeSend(Message message, Event event) {
return message;
}

private void sendEvent(Event event, Map<?, ?> properties) {
protected void sendEvent(Event event, Map<?, ?> properties) {
LogEvent logEvent = event.getEvent();
String name = logEvent.getLoggerName();
Level level = logEvent.getLevel();
Expand Down Expand Up @@ -312,8 +316,10 @@ private void sendEvent(Event event, Map<?, ?> properties) {
amqpProps.setTimestamp(tstamp.getTime());

// Copy properties in from MDC
for (Entry<?, ?> entry : properties.entrySet()) {
amqpProps.setHeader(entry.getKey().toString(), entry.getValue());
if (this.manager.addMdcAsHeaders) {
for (Entry<?, ?> entry : properties.entrySet()) {
amqpProps.setHeader(entry.getKey().toString(), entry.getValue());
}
}
if (logEvent.getSource() != null) {
amqpProps.setHeader(
Expand All @@ -326,7 +332,7 @@ private void sendEvent(Event event, Map<?, ?> properties) {
doSend(event, logEvent, amqpProps);
}

private void doSend(Event event, LogEvent logEvent, MessageProperties amqpProps) {
protected void doSend(Event event, LogEvent logEvent, MessageProperties amqpProps) {
StringBuilder msgBody;
String routingKey;
try {
Expand Down Expand Up @@ -605,6 +611,11 @@ protected static class AmqpManager extends AbstractManager {
*/
private String charset = Charset.defaultCharset().name();

/**
* Whether or not add MDC properties into message headers. true by default for backward compatibility
*/
private boolean addMdcAsHeaders = true;

private boolean durable = true;

private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
* @author Stephen Oakey
* @author Dominique Villard
* @author Nicolas Ristock
* @author Eugene Gusev
*
* @since 1.4
*/
Expand Down Expand Up @@ -296,6 +297,11 @@ public class AmqpAppender extends AppenderBase<ILoggingEvent> {
*/
private String charset;

/**
* Whether or not add MDC properties into message headers. true by default for backward compatibility
*/
private boolean addMdcAsHeaders = true;

private boolean durable = true;

private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
Expand Down Expand Up @@ -534,6 +540,14 @@ public void setMaxSenderRetries(int maxSenderRetries) {
this.maxSenderRetries = maxSenderRetries;
}

public boolean isAddMdcAsHeaders() {
return this.addMdcAsHeaders;
}

public void setAddMdcAsHeaders(boolean addMdcAsHeaders) {
this.addMdcAsHeaders = addMdcAsHeaders;
}

public boolean isDurable() {
return this.durable;
}
Expand Down Expand Up @@ -788,6 +802,54 @@ else if ("headers".equals(this.exchangeType)) {
}
}

protected MessageProperties prepareMessageProperties(Event event) {
ILoggingEvent logEvent = event.getEvent();

String name = logEvent.getLoggerName();
Level level = logEvent.getLevel();

MessageProperties amqpProps = new MessageProperties();
amqpProps.setDeliveryMode(this.deliveryMode);
amqpProps.setContentType(this.contentType);
if (null != this.contentEncoding) {
amqpProps.setContentEncoding(this.contentEncoding);
}
amqpProps.setHeader(CATEGORY_NAME, name);
amqpProps.setHeader(THREAD_NAME, logEvent.getThreadName());
amqpProps.setHeader(CATEGORY_LEVEL, level.toString());
if (this.generateId) {
amqpProps.setMessageId(UUID.randomUUID().toString());
}

// Set timestamp
Calendar tstamp = Calendar.getInstance();
tstamp.setTimeInMillis(logEvent.getTimeStamp());
amqpProps.setTimestamp(tstamp.getTime());

// Copy properties in from MDC
if (this.addMdcAsHeaders) {
Map<String, String> props = event.getProperties();
Set<Entry<String, String>> entrySet = props.entrySet();
for (Entry<String, String> entry : entrySet) {
amqpProps.setHeader(entry.getKey(), entry.getValue());
}
}

String[] location = this.locationLayout.doLayout(logEvent).split("\\|");
if (!"?".equals(location[0])) {
amqpProps.setHeader(
"location",
String.format("%s.%s()[%s]", location[0], location[1], location[2]));
}

// Set applicationId, if we're using one
if (this.applicationId != null) {
amqpProps.setAppId(this.applicationId);
}

return amqpProps;
}

/**
* Subclasses may modify the final message before sending.
* @param message The message.
Expand All @@ -810,50 +872,14 @@ public void run() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(AmqpAppender.this.connectionFactory);
while (true) {
final Event event = AmqpAppender.this.events.take();
ILoggingEvent logEvent = event.getEvent();

String name = logEvent.getLoggerName();
Level level = logEvent.getLevel();

MessageProperties amqpProps = new MessageProperties();
amqpProps.setDeliveryMode(AmqpAppender.this.deliveryMode);
amqpProps.setContentType(AmqpAppender.this.contentType);
if (null != AmqpAppender.this.contentEncoding) {
amqpProps.setContentEncoding(AmqpAppender.this.contentEncoding);
}
amqpProps.setHeader(CATEGORY_NAME, name);
amqpProps.setHeader(THREAD_NAME, logEvent.getThreadName());
amqpProps.setHeader(CATEGORY_LEVEL, level.toString());
if (AmqpAppender.this.generateId) {
amqpProps.setMessageId(UUID.randomUUID().toString());
}

// Set timestamp
Calendar tstamp = Calendar.getInstance();
tstamp.setTimeInMillis(logEvent.getTimeStamp());
amqpProps.setTimestamp(tstamp.getTime());

// Copy properties in from MDC
Map<String, String> props = event.getProperties();
Set<Entry<String, String>> entrySet = props.entrySet();
for (Entry<String, String> entry : entrySet) {
amqpProps.setHeader(entry.getKey(), entry.getValue());
}
String[] location = AmqpAppender.this.locationLayout.doLayout(logEvent).split("\\|");
if (!"?".equals(location[0])) {
amqpProps.setHeader(
"location",
String.format("%s.%s()[%s]", location[0], location[1], location[2]));
}
String routingKey = AmqpAppender.this.routingKeyLayout.doLayout(logEvent);
// Set applicationId, if we're using one
if (AmqpAppender.this.applicationId != null) {
amqpProps.setAppId(AmqpAppender.this.applicationId);
}

MessageProperties amqpProps = prepareMessageProperties(event);

String routingKey = AmqpAppender.this.routingKeyLayout.doLayout(event.getEvent());

sendOneEncoderPatternMessage(rabbitTemplate, routingKey);

doSend(rabbitTemplate, event, logEvent, name, amqpProps, routingKey);
doSend(rabbitTemplate, event, event.getEvent(), name, amqpProps, routingKey);
}
}
catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* @author Artem Bilan
* @author Dominique Villard
* @author Nicolas Ristock
* @author Eugene Gusev
*
* @since 1.6
*/
Expand Down Expand Up @@ -154,6 +155,8 @@ public void testProperties() {
assertThat(TestUtils.getPropertyValue(manager, "maxSenderRetries")).isEqualTo(5);
// change the property to true and this fails and test() randomly fails too.
assertThat(TestUtils.getPropertyValue(manager, "async", Boolean.class)).isFalse();
// default value
assertThat(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class)).isTrue();

assertThat(TestUtils.getPropertyValue(appender, "events.items", Object[].class).length).isEqualTo(10);

Expand All @@ -168,6 +171,10 @@ public void testAmqpAppenderEventQueueTypeDefaultsToLinkedBlockingQueue() {
Map.class).get("rabbitmq_default_queue");

Object events = TestUtils.getPropertyValue(appender, "events");

Object manager = TestUtils.getPropertyValue(appender, "manager");
assertThat(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class)).isTrue();

assertThat(events.getClass()).isEqualTo(LinkedBlockingQueue.class);
}

Expand All @@ -177,13 +184,15 @@ public void testUriProperties() {
AmqpAppender appender = (AmqpAppender) TestUtils.getPropertyValue(logger, "context.configuration.appenders",
Map.class).get("rabbitmq_uri");
Object manager = TestUtils.getPropertyValue(appender, "manager");
assertThat(TestUtils.getPropertyValue(manager, "uri").toString()).isEqualTo("amqp://guest:guest@localhost:5672/");
assertThat(TestUtils.getPropertyValue(manager, "uri").toString())
.isEqualTo("amqp://guest:guest@localhost:5672/");

assertThat(TestUtils.getPropertyValue(manager, "host")).isNull();
assertThat(TestUtils.getPropertyValue(manager, "port")).isNull();
assertThat(TestUtils.getPropertyValue(manager, "username")).isNull();
assertThat(TestUtils.getPropertyValue(manager, "password")).isNull();
assertThat(TestUtils.getPropertyValue(manager, "virtualHost")).isNull();
assertThat(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class)).isFalse();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.logback;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -52,6 +53,7 @@
/**
* @author Artem Bilan
* @author Nicolas Ristock
* @author Eugene Gusev
*
* @since 1.4
*/
Expand All @@ -75,15 +77,20 @@ public class AmqpAppenderIntegrationTests {
@Autowired
private Queue encodedQueue;

@Autowired
private Queue testQueue;

private SimpleMessageListenerContainer listenerContainer;

@Before
public void setUp() throws Exception {
listenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
public void setUp() {
this.listenerContainer = this.applicationContext.getBean(SimpleMessageListenerContainer.class);
MDC.clear();
}

@After
public void tearDown() {
MDC.clear();
listenerContainer.shutdown();
}

Expand Down Expand Up @@ -125,7 +132,9 @@ public void testAppenderWithProps() throws InterruptedException {
Object location = messageProperties.getHeaders().get("location");
assertThat(location).isNotNull();
assertThat(location).isInstanceOf(String.class);
assertThat((String) location).startsWith("org.springframework.amqp.rabbit.logback.AmqpAppenderIntegrationTests.testAppenderWithProps()");
assertThat((String) location)
.startsWith(
"org.springframework.amqp.rabbit.logback.AmqpAppenderIntegrationTests.testAppenderWithProps()");
Object threadName = messageProperties.getHeaders().get("thread");
assertThat(threadName).isNotNull();
assertThat(threadName).isInstanceOf(String.class);
Expand Down Expand Up @@ -177,6 +186,32 @@ public void customQueueIsUsedIfProvided() throws Exception {
verify(appenderQueue).add(argThat(arg -> arg.getEvent().getMessage().equals(testMessage)));
}

@Test
public void testAddMdcAsHeaders() {
this.applicationContext.getBean(SingleConnectionFactory.class).createConnection().close();

Logger logWithMdc = (Logger) LoggerFactory.getLogger("withMdc");
Logger logWithoutMdc = (Logger) LoggerFactory.getLogger("withoutMdc");
MDC.put("mdc1", "test1");
MDC.put("mdc2", "test2");

logWithMdc.info("test message with MDC in headers");
Message received1 = this.template.receive(this.testQueue.getName());

assertThat(received1).isNotNull();
assertThat(new String(received1.getBody())).isEqualTo("test message with MDC in headers");
assertThat(received1.getMessageProperties().getHeaders())
.contains(entry("mdc1", "test1"), entry("mdc1", "test1"));

logWithoutMdc.info("test message without MDC in headers");
Message received2 = this.template.receive(this.testQueue.getName());

assertThat(received2).isNotNull();
assertThat(new String(received2.getBody())).isEqualTo("test message without MDC in headers");
assertThat(received2.getMessageProperties().getHeaders())
.doesNotContain(entry("mdc1", "test1"), entry("mdc1", "test1"));
}

public static class EnhancedAppender extends AmqpAppender {

private String foo;
Expand Down
6 changes: 4 additions & 2 deletions spring-rabbit/src/test/resources/log4j2-amqp-appender.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
addresses="localhost:5672"
host="localhost" port="5672" user="guest" password="guest" applicationId="testAppId" charset="UTF-8"
routingKeyPattern="%X{applicationId}.%c.%p"
exchange="log4j2Test_default_queue" deliveryMode="NON_PERSISTENT">
exchange="log4j2Test_default_queue" deliveryMode="NON_PERSISTENT"
addMdcAsHeaders="true">
</RabbitMQ>
<RabbitMQ name="rabbitmq_uri"
uri="amqp://guest:guest@localhost:5672/"
Expand All @@ -32,7 +33,8 @@
contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
charset="UTF-8"
clientConnectionProperties="bar:foo,baz:qux"
senderPoolSize="3" maxSenderRetries="5">
senderPoolSize="3" maxSenderRetries="5"
addMdcAsHeaders="false">
</RabbitMQ>
</Appenders>
<Loggers>
Expand Down
Loading

0 comments on commit 925b8c6

Please sign in to comment.