Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to Netty 4.1 #4397

Merged
merged 14 commits into from
Jan 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/forbidden-apis/netty3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.jboss.netty.** @ Migrate to Netty 4.x
27 changes: 21 additions & 6 deletions graylog-project-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative-boringssl-static.version}</version>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative-boringssl-static.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
Expand Down Expand Up @@ -174,11 +193,6 @@
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
Expand Down Expand Up @@ -626,7 +640,7 @@
<!-- Use ALL the cores! -->
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Djava.library.path=${project.basedir}/../lib/sigar-${sigar.version}</argLine>
<argLine>-Djava.library.path=${project.basedir}/../lib/sigar-${sigar.version} -Dio.netty.leakDetectionLevel=paranoid</argLine>
<excludes>
<exclude>**/*IntegrationTest.java</exclude>
<exclude>**/*IT.java</exclude>
Expand Down Expand Up @@ -659,6 +673,7 @@
<!--bundledSignature>commons-io-unsafe-${commons-io.version}</bundledSignature-->
</bundledSignatures>
<signaturesFiles>
<signaturesFile>${project.basedir}/../config/forbidden-apis/netty3.txt</signaturesFile>
<signaturesFile>${project.basedir}/../config/forbidden-apis/signatures.txt</signaturesFile>
</signaturesFiles>
</configuration>
Expand Down
38 changes: 37 additions & 1 deletion graylog2-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,43 @@

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.google.inject.spi.Message;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
Expand All @@ -61,8 +63,6 @@
import org.graylog2.shared.plugins.ChainingClassLoader;
import org.graylog2.shared.plugins.PluginLoader;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.graylog2.plugin.system.NodeId;
import org.graylog2.shared.bindings.GenericBindings;
import org.graylog2.shared.bindings.GenericInitializerBindings;
import org.graylog2.shared.bindings.MessageInputBindings;
import org.graylog2.shared.bindings.SchedulerBindings;
import org.graylog2.shared.bindings.ServerStatusBindings;
import org.graylog2.shared.bindings.SharedPeriodicalBindings;
Expand Down Expand Up @@ -199,7 +198,6 @@ protected List<Module> getSharedBindingsModules() {
result.add(new SharedPeriodicalBindings());
result.add(new SchedulerBindings());
result.add(new GenericInitializerBindings());
result.add(new MessageInputBindings());
result.add(new SystemStatsModule(configuration.isDisableSigar()));

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.graylog2.indexer.IndexerBindings;
import org.graylog2.indexer.retention.RetentionStrategyBindings;
import org.graylog2.indexer.rotation.RotationStrategyBindings;
import org.graylog2.inputs.transports.NettyTransportConfiguration;
import org.graylog2.messageprocessors.MessageProcessorModule;
import org.graylog2.migrations.MigrationsModule;
import org.graylog2.notifications.Notification;
Expand All @@ -63,6 +64,7 @@
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.shared.UI;
import org.graylog2.shared.bindings.MessageInputBindings;
import org.graylog2.shared.bindings.ObjectMapperModule;
import org.graylog2.shared.bindings.RestApiBindings;
import org.graylog2.shared.system.activities.Activity;
Expand Down Expand Up @@ -93,6 +95,7 @@ public class Server extends ServerBootstrap {
private final MongoDbConfiguration mongoDbConfiguration = new MongoDbConfiguration();
private final VersionCheckConfiguration versionCheckConfiguration = new VersionCheckConfiguration();
private final KafkaJournalConfiguration kafkaJournalConfiguration = new KafkaJournalConfiguration();
private final NettyTransportConfiguration nettyTransportConfiguration = new NettyTransportConfiguration();

public Server() {
super("server", configuration);
Expand All @@ -117,6 +120,7 @@ protected List<Module> getCommandBindings() {
new MessageProcessorModule(),
new AlarmCallbackBindings(),
new InitializerBindings(),
new MessageInputBindings(),
new MessageOutputBindings(configuration, chainingClassLoader),
new RotationStrategyBindings(),
new RetentionStrategyBindings(),
Expand Down Expand Up @@ -145,7 +149,8 @@ protected List<Object> getCommandConfigurationBeans() {
emailConfiguration,
mongoDbConfiguration,
versionCheckConfiguration,
kafkaJournalConfiguration);
kafkaJournalConfiguration,
nettyTransportConfiguration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.graylog2.inputs.codecs.gelf.GELFMessage;
import org.graylog2.inputs.codecs.gelf.GELFMessageChunk;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,13 +85,13 @@ public GelfChunkAggregator(@Named("daemonScheduler") ScheduledExecutorService sc

@Nonnull
@Override
public Result addChunk(ChannelBuffer buffer) {
public Result addChunk(ByteBuf buffer) {
final byte[] readable = new byte[buffer.readableBytes()];
buffer.toByteBuffer().get(readable, buffer.readerIndex(), buffer.readableBytes());
buffer.readBytes(readable, buffer.readerIndex(), buffer.readableBytes());

final GELFMessage msg = new GELFMessage(readable);

final ChannelBuffer aggregatedBuffer;
final ByteBuf aggregatedBuffer;
switch (msg.getGELFType()) {
case CHUNKED:
try {
Expand All @@ -108,7 +108,7 @@ public Result addChunk(ChannelBuffer buffer) {
case ZLIB:
case GZIP:
case UNCOMPRESSED:
aggregatedBuffer = buffer;
aggregatedBuffer = Unpooled.wrappedBuffer(readable);
break;
case UNSUPPORTED:
return INVALID_RESULT;
Expand All @@ -127,7 +127,7 @@ public Result addChunk(ChannelBuffer buffer) {
* @return null or a {@link org.graylog2.plugin.journal.RawMessage raw message} object
*/
@Nullable
private ChannelBuffer checkForCompletion(GELFMessage gelfMessage) {
private ByteBuf checkForCompletion(GELFMessage gelfMessage) {
if (!chunks.isEmpty() && log.isDebugEnabled()) {
log.debug("Dumping GELF chunk map [chunks for {} messages]:\n{}", chunks.size(), humanReadableChunkMap());
}
Expand Down Expand Up @@ -178,7 +178,7 @@ private ChannelBuffer checkForCompletion(GELFMessage gelfMessage) {
}
}
completeMessages.inc();
return ChannelBuffers.wrappedBuffer(allChunks);
return Unpooled.wrappedBuffer(allChunks);
}

// message isn't complete yet, check if we should remove the other parts as well
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package org.graylog2.inputs.codecs.gelf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.inputs.MessageInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

public final class GELFMessageChunk {

Expand Down Expand Up @@ -54,14 +55,14 @@ public final class GELFMessageChunk {
private int sequenceCount = -1;
private long arrival = -1L;

private final ChannelBuffer payload;
private final ByteBuf payload;
private final MessageInput sourceInput;

public GELFMessageChunk(final byte[] payload, MessageInput sourceInput) {
if (payload.length < HEADER_TOTAL_LENGTH) {
throw new IllegalArgumentException("This GELF message chunk is too short. Cannot even contain the required header.");
}
this.payload = ChannelBuffers.wrappedBuffer(payload);
this.payload = Unpooled.wrappedBuffer(payload);
this.sourceInput = sourceInput;
read();
}
Expand Down Expand Up @@ -108,12 +109,10 @@ private void read() {
this.arrival = Tools.nowUTC().getMillis();
}

private String extractId() {
private void extractId() {
if (this.id == null) {
this.id = ChannelBuffers.hexDump(payload, HEADER_PART_HASH_START, HEADER_PART_HASH_LENGTH);
this.id = ByteBufUtil.hexDump(payload, HEADER_PART_HASH_START, HEADER_PART_HASH_LENGTH);
}

return this.id;
}

// lol duplication
Expand Down Expand Up @@ -152,20 +151,10 @@ private void extractData() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();

sb.append("ID: ");
sb.append(this.id);
sb.append("\tSequence: ");
sb.append(this.sequenceNumber + 1); // +1 for readability: 1/2 not 0/2
sb.append("/");
sb.append(this.sequenceCount);
sb.append("\tArrival: ");
sb.append(this.arrival);
sb.append("\tData size: ");
sb.append(this.payload.readableBytes());

return sb.toString();
return "ID: " + this.id +
"\tSequence: " + (this.sequenceNumber + 1) + // +1 for readability: 1/2 not 0/2
"/" + this.sequenceCount +
"\tArrival: " + this.arrival +
"\tData size: " + this.payload.readableBytes();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,30 @@
*/
package org.graylog2.inputs.syslog.tcp;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ByteProcessor;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* Implements a Netty {@link FrameDecoder} for the Syslog octet counting framing. (RFC6587)
* Implements a Netty {@link ByteToMessageDecoder} for the Syslog octet counting framing. (RFC6587)
*
* @see <a href="http://tools.ietf.org/html/rfc6587#section-3.4.1">RFC6587 Octet Counting</a>
*/
public class SyslogOctetCountFrameDecoder extends FrameDecoder {
public class SyslogOctetCountFrameDecoder extends ByteToMessageDecoder {
private static final ByteProcessor BYTE_PROCESSOR = value -> value != ' ';

@Override
protected Object decode(final ChannelHandlerContext ctx,
final Channel channel,
final ChannelBuffer buffer) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
final int frameSizeValueLength = findFrameSizeValueLength(buffer);

// We have not found the frame length value byte size yet.
if (frameSizeValueLength <= 0) {
return null;
return;
}

// Convert the frame length value bytes into an integer without mutating the buffer reader index.
Expand All @@ -49,18 +51,14 @@ protected Object decode(final ChannelHandlerContext ctx,
// the buffer has enough data to read the complete message.
if (buffer.readableBytes() - skipLength < length) {
// We cannot read the complete frame yet.
return null;
return;
} else {
// Skip the frame length value bytes and the whitespace that follows it.
buffer.skipBytes(skipLength);
}

final ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), length);

// Advance the reader index because extractFrame() does not do that.
buffer.skipBytes(length);

return frame;
final ByteBuf frame = buffer.readRetainedSlice(length);
out.add(frame);
}

/**
Expand All @@ -69,17 +67,14 @@ protected Object decode(final ChannelHandlerContext ctx,
* @param buffer The channel buffer
* @return The length of the frame length value
*/
private int findFrameSizeValueLength(final ChannelBuffer buffer) {
final int n = buffer.writerIndex();
private int findFrameSizeValueLength(final ByteBuf buffer) {
final int readerIndex = buffer.readerIndex();
int index = buffer.forEachByte(BYTE_PROCESSOR);

for (int i = buffer.readerIndex(); i < n; i ++) {
final byte b = buffer.getByte(i);

if (b == ' ') {
return i - buffer.readerIndex();
}
if (index >= 0) {
return index - readerIndex;
} else {
return -1;
}

return -1; // Not found.
}
}
Loading