diff --git a/graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetflowMessageAggregationHandler.java b/graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetflowMessageAggregationHandler.java index fb4213d733ba..08f82855f6e9 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetflowMessageAggregationHandler.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetflowMessageAggregationHandler.java @@ -24,12 +24,12 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator; +import org.graylog2.inputs.transports.netty.SenderEnvelope; import org.graylog2.plugin.inputs.codecs.CodecAggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.net.SocketAddress; public class NetflowMessageAggregationHandler extends SimpleChannelInboundHandler { private static final Logger LOG = LoggerFactory.getLogger(NetflowMessageAggregationHandler.class); @@ -46,7 +46,7 @@ public NetflowMessageAggregationHandler(RemoteAddressCodecAggregator aggregator, @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { - final SocketAddress remoteAddress = msg.sender(); + final InetSocketAddress remoteAddress = msg.sender(); final CodecAggregator.Result result; try (Timer.Context ignored = aggregationTimer.time()) { result = aggregator.addChunk(msg.content(), remoteAddress); @@ -54,7 +54,7 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throw final ByteBuf completeMessage = result.getMessage(); if (completeMessage != null) { LOG.debug("Message aggregation completion, forwarding {}", completeMessage); - ctx.fireChannelRead(completeMessage); + ctx.fireChannelRead(SenderEnvelope.of(completeMessage, remoteAddress)); } else if (result.isValid()) { LOG.debug("More chunks necessary to complete this message"); } else { diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/UdpTransport.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/UdpTransport.java index 22c4bca7b1c3..d39065af99c4 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/transports/UdpTransport.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/UdpTransport.java @@ -16,7 +16,6 @@ */ package org.graylog2.inputs.transports; -import com.codahale.metrics.MetricRegistry; import com.github.joschi.jadconfig.util.Size; import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; @@ -37,6 +36,8 @@ import io.netty.util.concurrent.GlobalEventExecutor; import org.graylog2.inputs.transports.netty.DatagramChannelFactory; import org.graylog2.inputs.transports.netty.DatagramPacketHandler; +import org.graylog2.inputs.transports.netty.EnvelopeMessageAggregationHandler; +import org.graylog2.inputs.transports.netty.EnvelopeMessageHandler; import org.graylog2.inputs.transports.netty.EventLoopGroupFactory; import org.graylog2.inputs.transports.netty.NettyTransportType; import org.graylog2.plugin.LocalMetricRegistry; @@ -46,6 +47,7 @@ import org.graylog2.plugin.inputs.MisfireException; import org.graylog2.plugin.inputs.annotations.ConfigClass; import org.graylog2.plugin.inputs.annotations.FactoryClass; +import org.graylog2.plugin.inputs.codecs.CodecAggregator; import org.graylog2.plugin.inputs.transports.NettyTransport; import org.graylog2.plugin.inputs.transports.Transport; import org.graylog2.plugin.inputs.util.ThroughputCounter; @@ -103,6 +105,19 @@ protected LinkedHashMap> getChannelHa return handlers; } + protected LinkedHashMap> getChildChannelHandlers(final MessageInput input) { + final LinkedHashMap> handlerList = new LinkedHashMap<>(getCustomChildChannelHandlers(input)); + + final CodecAggregator aggregator = getAggregator(); + if (aggregator != null) { + LOG.debug("Adding codec aggregator {} to channel pipeline", aggregator); + handlerList.put("codec-aggregator", () -> new EnvelopeMessageAggregationHandler(aggregator, localRegistry)); + } + handlerList.put("envelope-message-handler", () -> new EnvelopeMessageHandler(input)); + + return handlerList; + } + @Override public void launch(final MessageInput input) throws MisfireException { try { diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/MessageAggregationHandler.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ByteBufMessageAggregationHandler.java similarity index 86% rename from graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/MessageAggregationHandler.java rename to graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ByteBufMessageAggregationHandler.java index 7b7347a2b0fe..63c12e1f9742 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/MessageAggregationHandler.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ByteBufMessageAggregationHandler.java @@ -23,18 +23,17 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.graylog2.plugin.inputs.codecs.CodecAggregator; -import org.graylog2.plugin.inputs.transports.NettyTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MessageAggregationHandler extends SimpleChannelInboundHandler { - private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class); +public class ByteBufMessageAggregationHandler extends SimpleChannelInboundHandler { + private static final Logger LOG = LoggerFactory.getLogger(ByteBufMessageAggregationHandler.class); private final CodecAggregator aggregator; private final Timer aggregationTimer; private final Meter invalidChunksMeter; - public MessageAggregationHandler(CodecAggregator aggregator, MetricRegistry metricRegistry) { + public ByteBufMessageAggregationHandler(CodecAggregator aggregator, MetricRegistry metricRegistry) { this.aggregator = aggregator; aggregationTimer = metricRegistry.timer("aggregationTime"); invalidChunksMeter = metricRegistry.meter("invalidMessages"); diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramPacketHandler.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramPacketHandler.java index 875d0f77435e..6d75522ca413 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramPacketHandler.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramPacketHandler.java @@ -16,7 +16,6 @@ */ package org.graylog2.inputs.transports.netty; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; @@ -31,7 +30,6 @@ public class DatagramPacketHandler extends MessageToMessageDecoder out) throws Exception { - final ByteBuf content = msg.content(); - out.add(ReferenceCountUtil.retain(content)); + out.add(ReferenceCountUtil.retain(SenderEnvelope.of(msg.content(), msg.sender()))); } } diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageAggregationHandler.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageAggregationHandler.java new file mode 100644 index 000000000000..5945971ce16e --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageAggregationHandler.java @@ -0,0 +1,62 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ +package org.graylog2.inputs.transports.netty; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.graylog2.plugin.inputs.codecs.CodecAggregator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +public class EnvelopeMessageAggregationHandler extends SimpleChannelInboundHandler> { + private static final Logger LOG = LoggerFactory.getLogger(EnvelopeMessageAggregationHandler.class); + + private final CodecAggregator aggregator; + private final Timer aggregationTimer; + private final Meter invalidChunksMeter; + + public EnvelopeMessageAggregationHandler(CodecAggregator aggregator, MetricRegistry metricRegistry) { + this.aggregator = aggregator; + aggregationTimer = metricRegistry.timer("aggregationTime"); + invalidChunksMeter = metricRegistry.meter("invalidMessages"); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope envelope) throws Exception { + final CodecAggregator.Result result; + try (Timer.Context ignored = aggregationTimer.time()) { + result = aggregator.addChunk(envelope.content()); + } + final ByteBuf completeMessage = result.getMessage(); + if (completeMessage != null) { + LOG.debug("Message aggregation completion, forwarding {}", completeMessage); + ctx.fireChannelRead(SenderEnvelope.of(completeMessage, envelope.sender())); + } else if (result.isValid()) { + LOG.debug("More chunks necessary to complete this message"); + } else { + invalidChunksMeter.mark(); + LOG.debug("Message chunk was not valid and discarded."); + } + } +} \ No newline at end of file diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageHandler.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageHandler.java new file mode 100644 index 000000000000..63773687d622 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageHandler.java @@ -0,0 +1,55 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ +package org.graylog2.inputs.transports.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.graylog2.plugin.inputs.MessageInput; +import org.graylog2.plugin.inputs.transports.NettyTransport; +import org.graylog2.plugin.journal.RawMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +public class EnvelopeMessageHandler extends SimpleChannelInboundHandler> { + private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class); + + private final MessageInput input; + + public EnvelopeMessageHandler(MessageInput input) { + this.input = input; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope envelope) throws Exception { + final ByteBuf msg = envelope.content(); + final byte[] bytes = new byte[msg.readableBytes()]; + msg.readBytes(bytes); + final RawMessage raw = new RawMessage(bytes, envelope.sender()); + input.processRawMessage(raw); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.debug("Could not handle message, closing connection: {}", cause); + ctx.channel().close(); + super.exceptionCaught(ctx, cause); + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/SenderEnvelope.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/SenderEnvelope.java new file mode 100644 index 000000000000..ff7bf852820c --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/SenderEnvelope.java @@ -0,0 +1,40 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ +package org.graylog2.inputs.transports.netty; + +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.DefaultAddressedEnvelope; + +import java.net.InetSocketAddress; + +/** + * Helper class to simplify envelope creation. + */ +public class SenderEnvelope { + /** + * Returns a {@link AddressedEnvelope} of the given message and message sender values. + * + * @param message the message + * @param sender the sender address + * @param message type + * @param sender type + * @return the envelope + */ + public static AddressedEnvelope of(M message, A sender) { + return new DefaultAddressedEnvelope<>(message, null, sender); + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/AbstractTcpTransport.java b/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/AbstractTcpTransport.java index 79959e787d66..feb5bef50395 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/AbstractTcpTransport.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/AbstractTcpTransport.java @@ -17,7 +17,6 @@ package org.graylog2.plugin.inputs.transports; import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.netty.bootstrap.ServerBootstrap; @@ -44,6 +43,8 @@ import org.graylog2.inputs.transports.netty.ChannelRegistrationHandler; import org.graylog2.inputs.transports.netty.EventLoopGroupFactory; import org.graylog2.inputs.transports.netty.ExceptionLoggingChannelHandler; +import org.graylog2.inputs.transports.netty.ByteBufMessageAggregationHandler; +import org.graylog2.inputs.transports.netty.RawMessageHandler; import org.graylog2.inputs.transports.netty.ServerSocketChannelFactory; import org.graylog2.plugin.LocalMetricRegistry; import org.graylog2.plugin.configuration.Configuration; @@ -55,6 +56,7 @@ import org.graylog2.plugin.inputs.MessageInput; import org.graylog2.plugin.inputs.MisfireException; import org.graylog2.plugin.inputs.annotations.ConfigClass; +import org.graylog2.plugin.inputs.codecs.CodecAggregator; import org.graylog2.plugin.inputs.transports.util.KeyUtil; import org.graylog2.plugin.inputs.util.ConnectionCounter; import org.graylog2.plugin.inputs.util.ThroughputCounter; @@ -75,7 +77,6 @@ import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.Locale; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -227,6 +228,7 @@ public void stop() { @Override protected LinkedHashMap> getChildChannelHandlers(MessageInput input) { final LinkedHashMap> handlers = new LinkedHashMap<>(); + final CodecAggregator aggregator = getAggregator(); handlers.put("channel-registration", () -> new ChannelRegistrationHandler(childChannels)); handlers.put("traffic-counter", () -> throughputCounter); @@ -235,7 +237,12 @@ protected LinkedHashMap> getChildChan LOG.info("Enabled TLS for input [{}/{}]. key-file=\"{}\" cert-file=\"{}\"", input.getName(), input.getId(), tlsKeyFile, tlsCertFile); handlers.put("tls", getSslHandlerCallable(input)); } - handlers.putAll(super.getChildChannelHandlers(input)); + handlers.putAll(getCustomChildChannelHandlers(input)); + if (aggregator != null) { + LOG.debug("Adding codec aggregator {} to channel pipeline", aggregator); + handlers.put("codec-aggregator", () -> new ByteBufMessageAggregationHandler(aggregator, localRegistry)); + } + handlers.put("rawmessage-handler", () -> new RawMessageHandler(input)); handlers.put("exception-logger", () -> new ExceptionLoggingChannelHandler(input, LOG)); return handlers; diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/NettyTransport.java b/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/NettyTransport.java index b086db2d5b30..d88e3d62b1f0 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/NettyTransport.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/NettyTransport.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelPipeline; import org.graylog2.inputs.transports.netty.EventLoopGroupFactory; import org.graylog2.inputs.transports.netty.ExceptionLoggingChannelHandler; -import org.graylog2.inputs.transports.netty.MessageAggregationHandler; import org.graylog2.inputs.transports.netty.PromiseFailureHandler; import org.graylog2.inputs.transports.netty.RawMessageHandler; import org.graylog2.plugin.LocalMetricRegistry; @@ -176,17 +175,7 @@ protected LinkedHashMap> getCustomChi * @return list of custom {@link ChannelHandler channel handlers} to add to the Netty {@link ChannelPipeline channel pipeline} for child channels * @see #getCustomChildChannelHandlers(MessageInput) */ - protected LinkedHashMap> getChildChannelHandlers(final MessageInput input) { - final LinkedHashMap> handlerList = new LinkedHashMap<>(getCustomChildChannelHandlers(input)); - - if (aggregator != null) { - log.debug("Adding codec aggregator {} to channel pipeline", aggregator); - handlerList.put("codec-aggregator", () -> new MessageAggregationHandler(aggregator, localRegistry)); - } - handlerList.put("rawmessage-handler", () -> new RawMessageHandler(input)); - - return handlerList; - } + protected abstract LinkedHashMap> getChildChannelHandlers(final MessageInput input); protected int getRecvBufferSize() { return recvBufferSize;