From 80cb1e37915eea2a7e6e29cdfb288ae02abd22f7 Mon Sep 17 00:00:00 2001 From: Kay Roepke Date: Mon, 2 Jan 2012 18:09:44 +0100 Subject: [PATCH 1/2] switch to JBoss netty for receiving GELF messages this change also adds TCP and HTTP support for GELF messages. by default both are disabled. when sending GELF messages over TCP, delimit each message with a nul-byte after it (also if you only send one message). TCP GELF messages cannot be compressed right now and a have a maximum size of 2 MB HTTP messages should contain exactly one GELF messages, keepalive is not currently supported but forthcoming. --- misc/graylog2.conf | 2 + pom.xml | 5 + src/main/java/org/graylog2/Configuration.java | 31 +++-- src/main/java/org/graylog2/Main.java | 49 ++++--- .../gelf/ChunkedGELFClientHandler.java | 12 +- .../gelf/ChunkedGELFHandler.java | 51 ++++++++ .../messagehandlers/gelf/GELFMainThread.java | 82 ------------ .../messagehandlers/gelf/GELFServer.java | 123 +++++++++++++----- .../messagehandlers/gelf/HttpGELFHandler.java | 68 ++++++++++ .../gelf/SimpleGELFClientHandler.java | 11 +- .../gelf/SimpleGELFHandler.java | 49 +++++++ .../messagehandlers/gelf/TcpGELFHandler.java | 50 +++++++ .../messagehandlers/gelf/UdpGELFHandler.java | 90 +++++++++++++ 13 files changed, 470 insertions(+), 153 deletions(-) create mode 100644 src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java delete mode 100644 src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java create mode 100644 src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java create mode 100644 src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java create mode 100644 src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java create mode 100644 src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java diff --git a/misc/graylog2.conf b/misc/graylog2.conf index f4aee9302c97..295c0bb1682b 100644 --- a/misc/graylog2.conf +++ b/misc/graylog2.conf @@ -47,6 +47,8 @@ mongodb_threads_allowed_to_block_multiplier = 5 use_gelf = true gelf_listen_address = 0.0.0.0 gelf_listen_port = 12201 +use_gelf_tcp = false +# gelf_http_listen_port = 12202 # Drools Rule File (Use to rewrite incoming log messages) # rules_file = /etc/graylog2.d/rules/graylog2.drl diff --git a/pom.xml b/pom.xml index 81ee5c287b79..49c352d2290c 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,11 @@ slf4j-log4j12 1.6.1 + + org.jboss.netty + netty + 3.2.7.Final + diff --git a/src/main/java/org/graylog2/Configuration.java b/src/main/java/org/graylog2/Configuration.java index eb3d71b7b078..53c5945beac0 100644 --- a/src/main/java/org/graylog2/Configuration.java +++ b/src/main/java/org/graylog2/Configuration.java @@ -20,6 +20,15 @@ package org.graylog2; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.log4j.Logger; +import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue; +import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException; + import com.github.joschi.jadconfig.Parameter; import com.github.joschi.jadconfig.ValidationException; import com.github.joschi.jadconfig.ValidatorMethod; @@ -28,14 +37,6 @@ import com.github.joschi.jadconfig.validators.PositiveIntegerValidator; import com.github.joschi.jadconfig.validators.PositiveLongValidator; import com.mongodb.ServerAddress; -import org.apache.log4j.Logger; -import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue; -import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException; - -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; /** * Helper class to hold configuration of Graylog2 @@ -113,6 +114,12 @@ public class Configuration { @Parameter(value = "gelf_listen_port", required = true, validator = InetPortValidator.class) private int gelfListenPort = 12201; + @Parameter(value = "use_gelf_tcp") + private boolean useGelfTcp = false; + + @Parameter(value = "gelf_http_listen_port") + private int gelfHttpListenPort = 0; + @Parameter("amqp_enabled") private boolean amqpEnabled = false; @@ -231,6 +238,14 @@ public int getGelfListenPort() { return gelfListenPort; } + public boolean isUseGelfTcp() { + return useGelfTcp; + } + + public int getGelfHttpListenPort() { + return gelfHttpListenPort; + } + public boolean isAmqpEnabled() { return amqpEnabled; } diff --git a/src/main/java/org/graylog2/Main.java b/src/main/java/org/graylog2/Main.java index 232f7295e056..70f587543ace 100644 --- a/src/main/java/org/graylog2/Main.java +++ b/src/main/java/org/graylog2/Main.java @@ -20,11 +20,15 @@ package org.graylog2; -import com.beust.jcommander.JCommander; -import com.github.joschi.jadconfig.JadConfig; -import com.github.joschi.jadconfig.RepositoryException; -import com.github.joschi.jadconfig.ValidationException; -import com.github.joschi.jadconfig.repositories.PropertiesRepository; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.io.IOUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -35,7 +39,7 @@ import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue; import org.graylog2.messagehandlers.amqp.AMQPSubscriberThread; import org.graylog2.messagehandlers.gelf.ChunkedGELFClientManager; -import org.graylog2.messagehandlers.gelf.GELFMainThread; +import org.graylog2.messagehandlers.gelf.GELFServer; import org.graylog2.messagehandlers.syslog.SyslogServerThread; import org.graylog2.messagequeue.MessageQueue; import org.graylog2.messagequeue.MessageQueueFlusher; @@ -46,14 +50,11 @@ import org.graylog2.periodical.MessageRetentionThread; import org.graylog2.periodical.ServerValueWriterThread; -import java.io.FileWriter; -import java.io.IOException; -import java.io.Writer; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import com.beust.jcommander.JCommander; +import com.github.joschi.jadconfig.JadConfig; +import com.github.joschi.jadconfig.RepositoryException; +import com.github.joschi.jadconfig.ValidationException; +import com.github.joschi.jadconfig.repositories.PropertiesRepository; /** * Main class of Graylog2. @@ -168,7 +169,10 @@ public static void main(String[] args) { // Start GELF threads if (configuration.isUseGELF()) { - initializeGELFThreads(configuration.getGelfListenAddress(), configuration.getGelfListenPort(), scheduler); + initializeGELFThreads(configuration.getGelfListenAddress(), + configuration.getGelfListenPort(), + configuration.isUseGelfTcp(), + configuration.getGelfHttpListenPort(), scheduler); } // Initialize AMQP Broker if enabled @@ -231,13 +235,18 @@ private static void initializeMessageRetentionThread(ScheduledExecutorService sc LOG.info("Retention time management active."); } - private static void initializeGELFThreads(String gelfAddress, int gelfPort, ScheduledExecutorService scheduler) { - GELFMainThread gelfThread = new GELFMainThread(new InetSocketAddress(gelfAddress, gelfPort)); - gelfThread.start(); + private static void initializeGELFThreads(String gelfAddress, int gelfPort, boolean useTcp, int httpListenPort, ScheduledExecutorService scheduler) { - scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread(ChunkedGELFClientManager.getInstance()), ChunkedGELFClientManagerThread.INITIAL_DELAY, ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS); + if (GELFServer.create(new InetSocketAddress(gelfAddress, gelfPort), useTcp, httpListenPort)) { + scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread( + ChunkedGELFClientManager.getInstance()), + ChunkedGELFClientManagerThread.INITIAL_DELAY, + ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS); + LOG.info("GELF threads started"); + } else { + LOG.error("GELF threads could not be started."); + } - LOG.info("GELF threads started"); } private static void initializeSyslogServer(String syslogProtocol, int syslogPort) { diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java index 69ba003d9578..f342eb53eaee 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java @@ -20,6 +20,10 @@ package org.graylog2.messagehandlers.gelf; +import java.io.IOException; +import java.net.DatagramPacket; +import java.util.zip.DataFormatException; + import org.apache.log4j.Logger; import org.graylog2.Tools; import org.graylog2.blacklists.Blacklist; @@ -28,10 +32,6 @@ import org.graylog2.messagehandlers.common.MessageCountUpdateHook; import org.graylog2.messagehandlers.common.MessageParserHook; import org.graylog2.messagehandlers.common.ReceiveHookManager; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.util.zip.DataFormatException; import org.graylog2.messagequeue.MessageQueue; /** @@ -109,6 +109,10 @@ public ChunkedGELFClientHandler(DatagramPacket clientMessage) throws GELFExcepti } } + public ChunkedGELFClientHandler(byte[] data) throws GELFException, IOException, DataFormatException { + this(new DatagramPacket(data, data.length)); + } + private void decompress(byte[] data, String hash) throws InvalidGELFCompressionMethodException, IOException { // Determine compression type. int type = GELF.getGELFType(data); diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java new file mode 100644 index 000000000000..fb027cd76d09 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java @@ -0,0 +1,51 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +public class ChunkedGELFHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = Logger.getLogger(ChunkedGELFHandler.class); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) + throws Exception { + final ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); + final byte[] data = new byte[buffer.readableBytes()]; + buffer.getBytes(0, data); + LOG.info("received chunked gelf messages, passing on to handler"); + new ChunkedGELFClientHandler(data).handle(); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + throws Exception { + LOG.error("Could not handle chunked GELF message", e.getCause()); + e.getChannel().close(); + } + + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java b/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java deleted file mode 100644 index d3fefd1e032f..000000000000 --- a/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright 2010 Lennart Koopmann - * - * This file is part of Graylog2. - * - * Graylog2 is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Graylog2 is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Graylog2. If not, see . - * - */ - -package org.graylog2.messagehandlers.gelf; - -import org.apache.log4j.Logger; - -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * GELFMainThread.java: Jun 23, 2010 6:43:21 PM - * - * Thread responsible for listening for GELF messages. - * - * @author Lennart Koopmann - */ -public class GELFMainThread extends Thread { - - private static final Logger LOG = Logger.getLogger(GELFMainThread.class); - - private InetSocketAddress socketAddress; - - private ExecutorService threadPool = Executors.newCachedThreadPool(); - - /** - * Thread responsible for listening for GELF messages. - * - * @param socketAddress The {@link InetSocketAddress} to bind to - */ - public GELFMainThread(InetSocketAddress socketAddress) { - this.socketAddress = socketAddress; - } - - /** - * Run the thread. Runs forever! - */ - @Override public void run() { - GELFServer server = new GELFServer(); - if (!server.create(socketAddress)) { - throw new RuntimeException("Could not start GELF server. Do you have permissions to bind to " + socketAddress + "?"); - } - - // Run forever. - while (true) { - try { - // Listen on socket. - DatagramPacket receivedGelfSentence = server.listen(); - - // Skip empty sentences. - if (receivedGelfSentence.getLength() == 0) { - continue; - } - - // We got a connected client. Start a GELFClientHandlerThread() in our thread pool and wait for next client. - threadPool.execute(new GELFClientHandlerThread(receivedGelfSentence)); - } catch (Exception e) { - LOG.error("Skipping GELF client. Error: " + e.getMessage(), e); - } - } - } - -} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java b/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java index 1a7cf03da18a..3d06353e4051 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java @@ -20,12 +20,25 @@ package org.graylog2.messagehandlers.gelf; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.log4j.Logger; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.Delimiters; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; +import org.jboss.netty.handler.codec.http.HttpContentCompressor; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; /** * GELFThread.java: Jun 23, 2010 6:58:07 PM @@ -43,44 +56,86 @@ public class GELFServer { */ public static final int MAX_PACKET_SIZE = 8192; - private DatagramSocket serverSocket = null; - /** - * Create the UDP socket. + * Create the configured servers to accept GELF messages on: UDP and optionally nul-byte delimited TCP, and HTTP. * * @param socketAddress The {@link InetSocketAddress} to bind to - * @return boolean + * @param useTcp true if the TCP server should be started as well + * @param httpListenPort if non-zero the port on which to listen to HTTP requests + * @return boolean true if all requested servers could be bound, false otherwise */ - public boolean create(InetSocketAddress socketAddress) { + public static boolean create(InetSocketAddress socketAddress, boolean useTcp, int httpListenPort) { + final ExecutorService bossThreadPool = Executors.newCachedThreadPool(); + final ExecutorService workerThreadPool = Executors.newCachedThreadPool(); + + // UDP GELF server + + final ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap( + new NioDatagramChannelFactory(workerThreadPool) + ); + udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(new UdpGELFHandler()); + } + }); + + // TCP GELF server + ServerBootstrap tcpBootstrap = null; + if (useTcp) { + tcpBootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory( + bossThreadPool, workerThreadPool) + ); + tcpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline p = Channels.pipeline(); + p.addLast("framer", new DelimiterBasedFrameDecoder(2 * 1024 * 1024, + Delimiters.nulDelimiter())); + p.addLast("handler", new TcpGELFHandler()); + return p; + } + }); + } + + // HTTP GELF server + ServerBootstrap httpBootstrap = null; + if (httpListenPort != 0) { + httpBootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory(bossThreadPool, workerThreadPool) + ); + httpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("deflater", new HttpContentCompressor()); + pipeline.addLast("handler", new HttpGELFHandler()); + + return pipeline; + } + }); + } try { - this.serverSocket = new DatagramSocket(socketAddress); - } catch(IOException e) { - LOG.fatal("Could not create ServerSocket in GELFServer::create(): " + e.getMessage(), e); + udpBootstrap.bind(socketAddress); + LOG.info("Started UDP GELF server on " + socketAddress); + if (useTcp) { + tcpBootstrap.bind(socketAddress); + LOG.info("Started TCP GELF server on " + socketAddress); + } + if (httpListenPort != 0) { + httpBootstrap.bind(new InetSocketAddress(socketAddress.getAddress(), httpListenPort)); + LOG.info("Started HTTP GELF server on " + socketAddress.getAddress() + ":" + httpListenPort); + } + } catch (ChannelException e) { + LOG.fatal("Could not bind GELF server to address " + socketAddress, e); return false; } - LOG.info("Started GELF server on " + socketAddress); - return true; } - /** - * Listens on the formerly created (create()) socket and returns - * unzipped (GZIP) raw message that can be parsed to a GELFMessage. - * - * @return Received message - * @throws IOException - */ - public DatagramPacket listen() throws IOException { - - // Create buffer. - byte[] receiveData = new byte[MAX_PACKET_SIZE]; - DatagramPacket receivedPacket = new DatagramPacket(receiveData, receiveData.length); - - // Reveive and fill buffer. - serverSocket.receive(receivedPacket); - - return receivedPacket; - } - } diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java new file mode 100644 index 000000000000..2c306c51a63b --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java @@ -0,0 +1,68 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.jboss.netty.util.CharsetUtil; + +public class HttpGELFHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = Logger.getLogger(HttpGELFHandler.class); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event) + throws Exception { + final HttpRequest request = (HttpRequest) event.getMessage(); + final ChannelBuffer content = request.getContent(); + final String receivedGelfSentence = content.toString(CharsetUtil.UTF_8); + LOG.info("received http message: " + receivedGelfSentence); + new SimpleGELFClientHandler(receivedGelfSentence).handle(); + + // Build the response object. + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED); + response.setContent(ChannelBuffers.copiedBuffer("", CharsetUtil.UTF_8)); + response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); + + final ChannelFuture future = event.getChannel().write(response); + future.addListener(ChannelFutureListener.CLOSE); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + throws Exception { + LOG.error("Exception caught while reading HTTP GELF message", e.getCause()); + e.getChannel().close(); + } + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java index 7215003119fa..8ba8263c2598 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java @@ -20,6 +20,11 @@ package org.graylog2.messagehandlers.gelf; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.DatagramPacket; +import java.util.zip.DataFormatException; + import org.apache.log4j.Logger; import org.graylog2.Tools; import org.graylog2.blacklists.Blacklist; @@ -28,11 +33,6 @@ import org.graylog2.messagehandlers.common.MessageCountUpdateHook; import org.graylog2.messagehandlers.common.MessageParserHook; import org.graylog2.messagehandlers.common.ReceiveHookManager; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.DatagramPacket; -import java.util.zip.DataFormatException; import org.graylog2.messagequeue.MessageQueue; /** @@ -93,6 +93,7 @@ public SimpleGELFClientHandler(Object clientMessage) throws DataFormatException, } } + /** * Handles the client: Decodes JSON, Stores in Indexer, ReceiveHooks diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java new file mode 100644 index 000000000000..db9c5979e848 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java @@ -0,0 +1,49 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.util.CharsetUtil; + +public class SimpleGELFHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = Logger.getLogger(SimpleGELFClientHandler.class); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) + throws Exception { + final ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); + new SimpleGELFClientHandler(buffer.toString(CharsetUtil.UTF_8)).handle(); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + throws Exception { + LOG.error("Could not handle single GELF message", e.getCause()); + e.getChannel().close(); + } + + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java new file mode 100644 index 000000000000..e16b77c1a1f2 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java @@ -0,0 +1,50 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.util.CharsetUtil; + +public class TcpGELFHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = Logger.getLogger(TcpGELFHandler.class); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) + throws Exception { + final ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); + LOG.info("received TCP GELF message from " + e.getRemoteAddress()); + new SimpleGELFClientHandler(buffer.toString(CharsetUtil.UTF_8)).handle(); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + throws Exception { + LOG.error("Could not handle TCP GELF message", e.getCause()); + e.getChannel().close(); + } + + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java new file mode 100644 index 000000000000..d02f5bce6c14 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java @@ -0,0 +1,90 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.handler.codec.compression.ZlibDecoder; +import org.jboss.netty.handler.codec.compression.ZlibWrapper; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +public class UdpGELFHandler extends FrameDecoder { + private static final Logger LOG = Logger.getLogger(UdpGELFHandler.class); + + @Override + protected Object decode(final ChannelHandlerContext ctx, final Channel channel, + final ChannelBuffer buffer) throws Exception { + if (buffer.readableBytes() < 2) { + return null; + } + final int magic1 = buffer.getUnsignedByte(buffer.readerIndex()); + final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1); + final byte[] data = new byte[] {(byte) magic1, (byte) magic2}; + + switch (GELF.getGELFType(data)) { + case GELF.TYPE_GZIP: + enableGzip(ctx); + break; + case GELF.TYPE_ZLIB: + enableZlib(ctx); + break; + case GELF.TYPE_CHUNKED: + enableChunked(ctx); + break; + default: { + // Unknown protocol; discard everything and close the connection. + buffer.skipBytes(buffer.readableBytes()); + ctx.getChannel().close(); + return null; + } + } + LOG.info("passing on bytes to next handler"); + return buffer.readBytes(buffer.readableBytes()); + } + + private void enableChunked(final ChannelHandlerContext ctx) { + LOG.info("enabling handling of chunked gelf messages on this connection"); + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("chunked", new ChunkedGELFHandler()); + pipeline.remove(this); + } + + private void enableZlib(final ChannelHandlerContext ctx) { + LOG.info("enabling handling of zlib gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("zlibdeflater", new ZlibDecoder(ZlibWrapper.ZLIB)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); + } + + private void enableGzip(final ChannelHandlerContext ctx) { + LOG.info("enabling handling of gzip gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("gzipdeflater", new ZlibDecoder(ZlibWrapper.GZIP)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); + } +} From 16bbd2de727d5d4878f9a585459eefa2d50e9633 Mon Sep 17 00:00:00 2001 From: Kay Roepke Date: Mon, 2 Jan 2012 19:42:00 +0100 Subject: [PATCH 2/2] address review comment add compression to tcp handler --- src/main/java/org/graylog2/Main.java | 3 +- .../messagehandlers/gelf/HttpGELFHandler.java | 2 +- .../messagehandlers/gelf/TcpGELFHandler.java | 66 ++++++++++++++----- .../messagehandlers/gelf/UdpGELFHandler.java | 2 +- 4 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/graylog2/Main.java b/src/main/java/org/graylog2/Main.java index 70f587543ace..79b43e4c739c 100644 --- a/src/main/java/org/graylog2/Main.java +++ b/src/main/java/org/graylog2/Main.java @@ -244,7 +244,8 @@ private static void initializeGELFThreads(String gelfAddress, int gelfPort, bool ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS); LOG.info("GELF threads started"); } else { - LOG.error("GELF threads could not be started."); + LOG.fatal("GELF threads could not be started."); + System.exit(1); } } diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java index 2c306c51a63b..9126c648bca0 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java @@ -39,7 +39,7 @@ public class HttpGELFHandler extends SimpleChannelUpstreamHandler { private static final Logger LOG = Logger.getLogger(HttpGELFHandler.class); - + @Override public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event) throws Exception { diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java index e16b77c1a1f2..1f0a1aad4929 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java @@ -22,29 +22,61 @@ import org.apache.log4j.Logger; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.util.CharsetUtil; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.handler.codec.compression.ZlibDecoder; +import org.jboss.netty.handler.codec.compression.ZlibWrapper; +import org.jboss.netty.handler.codec.frame.FrameDecoder; -public class TcpGELFHandler extends SimpleChannelUpstreamHandler { +public class TcpGELFHandler extends FrameDecoder { private static final Logger LOG = Logger.getLogger(TcpGELFHandler.class); - + + // TODO refactor, duplicated across a couple of places now. @Override - public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) - throws Exception { - final ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); - LOG.info("received TCP GELF message from " + e.getRemoteAddress()); - new SimpleGELFClientHandler(buffer.toString(CharsetUtil.UTF_8)).handle(); + protected Object decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buffer) throws Exception { + if (buffer.readableBytes() < 2) { + return null; + } + final int magic1 = buffer.getUnsignedByte(buffer.readerIndex()); + final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1); + final byte[] data = new byte[] {(byte) magic1, (byte) magic2}; + + switch (GELF.getGELFType(data)) { + case GELF.TYPE_GZIP: + enableGzip(ctx); + break; + case GELF.TYPE_ZLIB: + enableZlib(ctx); + break; + default: { + // Unknown protocol; discard everything and close the connection. + LOG.info("unknown protocol for GELF message, discarding"); + buffer.skipBytes(buffer.readableBytes()); + ctx.getChannel().close(); + return null; + } + } + return buffer.readBytes(buffer.readableBytes()); } - @Override - public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) - throws Exception { - LOG.error("Could not handle TCP GELF message", e.getCause()); - e.getChannel().close(); + private void enableZlib(final ChannelHandlerContext ctx) { + LOG.debug("enabling handling of zlib gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("zlibdeflater", new ZlibDecoder(ZlibWrapper.ZLIB)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); + } + + private void enableGzip(final ChannelHandlerContext ctx) { + LOG.debug("enabling handling of gzip gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("gzipdeflater", new ZlibDecoder(ZlibWrapper.GZIP)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); } - } diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java index d02f5bce6c14..b50c1d76ce2f 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java @@ -54,12 +54,12 @@ protected Object decode(final ChannelHandlerContext ctx, final Channel channel, break; default: { // Unknown protocol; discard everything and close the connection. + LOG.info("unknown protocol for GELF message, discarding"); buffer.skipBytes(buffer.readableBytes()); ctx.getChannel().close(); return null; } } - LOG.info("passing on bytes to next handler"); return buffer.readBytes(buffer.readableBytes()); }