diff --git a/misc/graylog2.conf b/misc/graylog2.conf index f4aee9302c97..295c0bb1682b 100644 --- a/misc/graylog2.conf +++ b/misc/graylog2.conf @@ -47,6 +47,8 @@ mongodb_threads_allowed_to_block_multiplier = 5 use_gelf = true gelf_listen_address = 0.0.0.0 gelf_listen_port = 12201 +use_gelf_tcp = false +# gelf_http_listen_port = 12202 # Drools Rule File (Use to rewrite incoming log messages) # rules_file = /etc/graylog2.d/rules/graylog2.drl diff --git a/pom.xml b/pom.xml index 81ee5c287b79..49c352d2290c 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,11 @@ slf4j-log4j12 1.6.1 + + org.jboss.netty + netty + 3.2.7.Final + diff --git a/src/main/java/org/graylog2/Configuration.java b/src/main/java/org/graylog2/Configuration.java index eb3d71b7b078..53c5945beac0 100644 --- a/src/main/java/org/graylog2/Configuration.java +++ b/src/main/java/org/graylog2/Configuration.java @@ -20,6 +20,15 @@ package org.graylog2; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.log4j.Logger; +import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue; +import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException; + import com.github.joschi.jadconfig.Parameter; import com.github.joschi.jadconfig.ValidationException; import com.github.joschi.jadconfig.ValidatorMethod; @@ -28,14 +37,6 @@ import com.github.joschi.jadconfig.validators.PositiveIntegerValidator; import com.github.joschi.jadconfig.validators.PositiveLongValidator; import com.mongodb.ServerAddress; -import org.apache.log4j.Logger; -import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue; -import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException; - -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; /** * Helper class to hold configuration of Graylog2 @@ -113,6 +114,12 @@ public class Configuration { @Parameter(value = "gelf_listen_port", required = true, validator = InetPortValidator.class) private int gelfListenPort = 12201; + @Parameter(value = "use_gelf_tcp") + private boolean useGelfTcp = false; + + @Parameter(value = "gelf_http_listen_port") + private int gelfHttpListenPort = 0; + @Parameter("amqp_enabled") private boolean amqpEnabled = false; @@ -231,6 +238,14 @@ public int getGelfListenPort() { return gelfListenPort; } + public boolean isUseGelfTcp() { + return useGelfTcp; + } + + public int getGelfHttpListenPort() { + return gelfHttpListenPort; + } + public boolean isAmqpEnabled() { return amqpEnabled; } diff --git a/src/main/java/org/graylog2/Main.java b/src/main/java/org/graylog2/Main.java index 232f7295e056..79b43e4c739c 100644 --- a/src/main/java/org/graylog2/Main.java +++ b/src/main/java/org/graylog2/Main.java @@ -20,11 +20,15 @@ package org.graylog2; -import com.beust.jcommander.JCommander; -import com.github.joschi.jadconfig.JadConfig; -import com.github.joschi.jadconfig.RepositoryException; -import com.github.joschi.jadconfig.ValidationException; -import com.github.joschi.jadconfig.repositories.PropertiesRepository; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.io.IOUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -35,7 +39,7 @@ import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue; import org.graylog2.messagehandlers.amqp.AMQPSubscriberThread; import org.graylog2.messagehandlers.gelf.ChunkedGELFClientManager; -import org.graylog2.messagehandlers.gelf.GELFMainThread; +import org.graylog2.messagehandlers.gelf.GELFServer; import org.graylog2.messagehandlers.syslog.SyslogServerThread; import org.graylog2.messagequeue.MessageQueue; import org.graylog2.messagequeue.MessageQueueFlusher; @@ -46,14 +50,11 @@ import org.graylog2.periodical.MessageRetentionThread; import org.graylog2.periodical.ServerValueWriterThread; -import java.io.FileWriter; -import java.io.IOException; -import java.io.Writer; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import com.beust.jcommander.JCommander; +import com.github.joschi.jadconfig.JadConfig; +import com.github.joschi.jadconfig.RepositoryException; +import com.github.joschi.jadconfig.ValidationException; +import com.github.joschi.jadconfig.repositories.PropertiesRepository; /** * Main class of Graylog2. @@ -168,7 +169,10 @@ public static void main(String[] args) { // Start GELF threads if (configuration.isUseGELF()) { - initializeGELFThreads(configuration.getGelfListenAddress(), configuration.getGelfListenPort(), scheduler); + initializeGELFThreads(configuration.getGelfListenAddress(), + configuration.getGelfListenPort(), + configuration.isUseGelfTcp(), + configuration.getGelfHttpListenPort(), scheduler); } // Initialize AMQP Broker if enabled @@ -231,13 +235,19 @@ private static void initializeMessageRetentionThread(ScheduledExecutorService sc LOG.info("Retention time management active."); } - private static void initializeGELFThreads(String gelfAddress, int gelfPort, ScheduledExecutorService scheduler) { - GELFMainThread gelfThread = new GELFMainThread(new InetSocketAddress(gelfAddress, gelfPort)); - gelfThread.start(); + private static void initializeGELFThreads(String gelfAddress, int gelfPort, boolean useTcp, int httpListenPort, ScheduledExecutorService scheduler) { - scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread(ChunkedGELFClientManager.getInstance()), ChunkedGELFClientManagerThread.INITIAL_DELAY, ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS); + if (GELFServer.create(new InetSocketAddress(gelfAddress, gelfPort), useTcp, httpListenPort)) { + scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread( + ChunkedGELFClientManager.getInstance()), + ChunkedGELFClientManagerThread.INITIAL_DELAY, + ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS); + LOG.info("GELF threads started"); + } else { + LOG.fatal("GELF threads could not be started."); + System.exit(1); + } - LOG.info("GELF threads started"); } private static void initializeSyslogServer(String syslogProtocol, int syslogPort) { diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java index 69ba003d9578..f342eb53eaee 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java @@ -20,6 +20,10 @@ package org.graylog2.messagehandlers.gelf; +import java.io.IOException; +import java.net.DatagramPacket; +import java.util.zip.DataFormatException; + import org.apache.log4j.Logger; import org.graylog2.Tools; import org.graylog2.blacklists.Blacklist; @@ -28,10 +32,6 @@ import org.graylog2.messagehandlers.common.MessageCountUpdateHook; import org.graylog2.messagehandlers.common.MessageParserHook; import org.graylog2.messagehandlers.common.ReceiveHookManager; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.util.zip.DataFormatException; import org.graylog2.messagequeue.MessageQueue; /** @@ -109,6 +109,10 @@ public ChunkedGELFClientHandler(DatagramPacket clientMessage) throws GELFExcepti } } + public ChunkedGELFClientHandler(byte[] data) throws GELFException, IOException, DataFormatException { + this(new DatagramPacket(data, data.length)); + } + private void decompress(byte[] data, String hash) throws InvalidGELFCompressionMethodException, IOException { // Determine compression type. int type = GELF.getGELFType(data); diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java new file mode 100644 index 000000000000..fb027cd76d09 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java @@ -0,0 +1,51 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +public class ChunkedGELFHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = Logger.getLogger(ChunkedGELFHandler.class); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) + throws Exception { + final ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); + final byte[] data = new byte[buffer.readableBytes()]; + buffer.getBytes(0, data); + LOG.info("received chunked gelf messages, passing on to handler"); + new ChunkedGELFClientHandler(data).handle(); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + throws Exception { + LOG.error("Could not handle chunked GELF message", e.getCause()); + e.getChannel().close(); + } + + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java b/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java deleted file mode 100644 index d3fefd1e032f..000000000000 --- a/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright 2010 Lennart Koopmann - * - * This file is part of Graylog2. - * - * Graylog2 is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Graylog2 is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Graylog2. If not, see . - * - */ - -package org.graylog2.messagehandlers.gelf; - -import org.apache.log4j.Logger; - -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * GELFMainThread.java: Jun 23, 2010 6:43:21 PM - * - * Thread responsible for listening for GELF messages. - * - * @author Lennart Koopmann - */ -public class GELFMainThread extends Thread { - - private static final Logger LOG = Logger.getLogger(GELFMainThread.class); - - private InetSocketAddress socketAddress; - - private ExecutorService threadPool = Executors.newCachedThreadPool(); - - /** - * Thread responsible for listening for GELF messages. - * - * @param socketAddress The {@link InetSocketAddress} to bind to - */ - public GELFMainThread(InetSocketAddress socketAddress) { - this.socketAddress = socketAddress; - } - - /** - * Run the thread. Runs forever! - */ - @Override public void run() { - GELFServer server = new GELFServer(); - if (!server.create(socketAddress)) { - throw new RuntimeException("Could not start GELF server. Do you have permissions to bind to " + socketAddress + "?"); - } - - // Run forever. - while (true) { - try { - // Listen on socket. - DatagramPacket receivedGelfSentence = server.listen(); - - // Skip empty sentences. - if (receivedGelfSentence.getLength() == 0) { - continue; - } - - // We got a connected client. Start a GELFClientHandlerThread() in our thread pool and wait for next client. - threadPool.execute(new GELFClientHandlerThread(receivedGelfSentence)); - } catch (Exception e) { - LOG.error("Skipping GELF client. Error: " + e.getMessage(), e); - } - } - } - -} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java b/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java index 1a7cf03da18a..3d06353e4051 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java @@ -20,12 +20,25 @@ package org.graylog2.messagehandlers.gelf; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.log4j.Logger; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.Delimiters; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; +import org.jboss.netty.handler.codec.http.HttpContentCompressor; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; /** * GELFThread.java: Jun 23, 2010 6:58:07 PM @@ -43,44 +56,86 @@ public class GELFServer { */ public static final int MAX_PACKET_SIZE = 8192; - private DatagramSocket serverSocket = null; - /** - * Create the UDP socket. + * Create the configured servers to accept GELF messages on: UDP and optionally nul-byte delimited TCP, and HTTP. * * @param socketAddress The {@link InetSocketAddress} to bind to - * @return boolean + * @param useTcp true if the TCP server should be started as well + * @param httpListenPort if non-zero the port on which to listen to HTTP requests + * @return boolean true if all requested servers could be bound, false otherwise */ - public boolean create(InetSocketAddress socketAddress) { + public static boolean create(InetSocketAddress socketAddress, boolean useTcp, int httpListenPort) { + final ExecutorService bossThreadPool = Executors.newCachedThreadPool(); + final ExecutorService workerThreadPool = Executors.newCachedThreadPool(); + + // UDP GELF server + + final ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap( + new NioDatagramChannelFactory(workerThreadPool) + ); + udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(new UdpGELFHandler()); + } + }); + + // TCP GELF server + ServerBootstrap tcpBootstrap = null; + if (useTcp) { + tcpBootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory( + bossThreadPool, workerThreadPool) + ); + tcpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline p = Channels.pipeline(); + p.addLast("framer", new DelimiterBasedFrameDecoder(2 * 1024 * 1024, + Delimiters.nulDelimiter())); + p.addLast("handler", new TcpGELFHandler()); + return p; + } + }); + } + + // HTTP GELF server + ServerBootstrap httpBootstrap = null; + if (httpListenPort != 0) { + httpBootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory(bossThreadPool, workerThreadPool) + ); + httpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("deflater", new HttpContentCompressor()); + pipeline.addLast("handler", new HttpGELFHandler()); + + return pipeline; + } + }); + } try { - this.serverSocket = new DatagramSocket(socketAddress); - } catch(IOException e) { - LOG.fatal("Could not create ServerSocket in GELFServer::create(): " + e.getMessage(), e); + udpBootstrap.bind(socketAddress); + LOG.info("Started UDP GELF server on " + socketAddress); + if (useTcp) { + tcpBootstrap.bind(socketAddress); + LOG.info("Started TCP GELF server on " + socketAddress); + } + if (httpListenPort != 0) { + httpBootstrap.bind(new InetSocketAddress(socketAddress.getAddress(), httpListenPort)); + LOG.info("Started HTTP GELF server on " + socketAddress.getAddress() + ":" + httpListenPort); + } + } catch (ChannelException e) { + LOG.fatal("Could not bind GELF server to address " + socketAddress, e); return false; } - LOG.info("Started GELF server on " + socketAddress); - return true; } - /** - * Listens on the formerly created (create()) socket and returns - * unzipped (GZIP) raw message that can be parsed to a GELFMessage. - * - * @return Received message - * @throws IOException - */ - public DatagramPacket listen() throws IOException { - - // Create buffer. - byte[] receiveData = new byte[MAX_PACKET_SIZE]; - DatagramPacket receivedPacket = new DatagramPacket(receiveData, receiveData.length); - - // Reveive and fill buffer. - serverSocket.receive(receivedPacket); - - return receivedPacket; - } - } diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java new file mode 100644 index 000000000000..9126c648bca0 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java @@ -0,0 +1,68 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.jboss.netty.util.CharsetUtil; + +public class HttpGELFHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = Logger.getLogger(HttpGELFHandler.class); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event) + throws Exception { + final HttpRequest request = (HttpRequest) event.getMessage(); + final ChannelBuffer content = request.getContent(); + final String receivedGelfSentence = content.toString(CharsetUtil.UTF_8); + LOG.info("received http message: " + receivedGelfSentence); + new SimpleGELFClientHandler(receivedGelfSentence).handle(); + + // Build the response object. + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED); + response.setContent(ChannelBuffers.copiedBuffer("", CharsetUtil.UTF_8)); + response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); + + final ChannelFuture future = event.getChannel().write(response); + future.addListener(ChannelFutureListener.CLOSE); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + throws Exception { + LOG.error("Exception caught while reading HTTP GELF message", e.getCause()); + e.getChannel().close(); + } + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java index 7215003119fa..8ba8263c2598 100644 --- a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java +++ b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java @@ -20,6 +20,11 @@ package org.graylog2.messagehandlers.gelf; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.DatagramPacket; +import java.util.zip.DataFormatException; + import org.apache.log4j.Logger; import org.graylog2.Tools; import org.graylog2.blacklists.Blacklist; @@ -28,11 +33,6 @@ import org.graylog2.messagehandlers.common.MessageCountUpdateHook; import org.graylog2.messagehandlers.common.MessageParserHook; import org.graylog2.messagehandlers.common.ReceiveHookManager; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.DatagramPacket; -import java.util.zip.DataFormatException; import org.graylog2.messagequeue.MessageQueue; /** @@ -93,6 +93,7 @@ public SimpleGELFClientHandler(Object clientMessage) throws DataFormatException, } } + /** * Handles the client: Decodes JSON, Stores in Indexer, ReceiveHooks diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java new file mode 100644 index 000000000000..db9c5979e848 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java @@ -0,0 +1,49 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.util.CharsetUtil; + +public class SimpleGELFHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = Logger.getLogger(SimpleGELFClientHandler.class); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) + throws Exception { + final ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); + new SimpleGELFClientHandler(buffer.toString(CharsetUtil.UTF_8)).handle(); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + throws Exception { + LOG.error("Could not handle single GELF message", e.getCause()); + e.getChannel().close(); + } + + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java new file mode 100644 index 000000000000..1f0a1aad4929 --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java @@ -0,0 +1,82 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.handler.codec.compression.ZlibDecoder; +import org.jboss.netty.handler.codec.compression.ZlibWrapper; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +public class TcpGELFHandler extends FrameDecoder { + private static final Logger LOG = Logger.getLogger(TcpGELFHandler.class); + + // TODO refactor, duplicated across a couple of places now. + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buffer) throws Exception { + if (buffer.readableBytes() < 2) { + return null; + } + final int magic1 = buffer.getUnsignedByte(buffer.readerIndex()); + final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1); + final byte[] data = new byte[] {(byte) magic1, (byte) magic2}; + + switch (GELF.getGELFType(data)) { + case GELF.TYPE_GZIP: + enableGzip(ctx); + break; + case GELF.TYPE_ZLIB: + enableZlib(ctx); + break; + default: { + // Unknown protocol; discard everything and close the connection. + LOG.info("unknown protocol for GELF message, discarding"); + buffer.skipBytes(buffer.readableBytes()); + ctx.getChannel().close(); + return null; + } + } + return buffer.readBytes(buffer.readableBytes()); + } + + private void enableZlib(final ChannelHandlerContext ctx) { + LOG.debug("enabling handling of zlib gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("zlibdeflater", new ZlibDecoder(ZlibWrapper.ZLIB)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); + } + + private void enableGzip(final ChannelHandlerContext ctx) { + LOG.debug("enabling handling of gzip gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("gzipdeflater", new ZlibDecoder(ZlibWrapper.GZIP)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); + } + +} diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java new file mode 100644 index 000000000000..b50c1d76ce2f --- /dev/null +++ b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java @@ -0,0 +1,90 @@ +/** + * Copyright 2012 Kay Roepke + * + * This file is part of Graylog2. + * + * Graylog2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog2. If not, see . + * + */ + +package org.graylog2.messagehandlers.gelf; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.handler.codec.compression.ZlibDecoder; +import org.jboss.netty.handler.codec.compression.ZlibWrapper; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +public class UdpGELFHandler extends FrameDecoder { + private static final Logger LOG = Logger.getLogger(UdpGELFHandler.class); + + @Override + protected Object decode(final ChannelHandlerContext ctx, final Channel channel, + final ChannelBuffer buffer) throws Exception { + if (buffer.readableBytes() < 2) { + return null; + } + final int magic1 = buffer.getUnsignedByte(buffer.readerIndex()); + final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1); + final byte[] data = new byte[] {(byte) magic1, (byte) magic2}; + + switch (GELF.getGELFType(data)) { + case GELF.TYPE_GZIP: + enableGzip(ctx); + break; + case GELF.TYPE_ZLIB: + enableZlib(ctx); + break; + case GELF.TYPE_CHUNKED: + enableChunked(ctx); + break; + default: { + // Unknown protocol; discard everything and close the connection. + LOG.info("unknown protocol for GELF message, discarding"); + buffer.skipBytes(buffer.readableBytes()); + ctx.getChannel().close(); + return null; + } + } + return buffer.readBytes(buffer.readableBytes()); + } + + private void enableChunked(final ChannelHandlerContext ctx) { + LOG.info("enabling handling of chunked gelf messages on this connection"); + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("chunked", new ChunkedGELFHandler()); + pipeline.remove(this); + } + + private void enableZlib(final ChannelHandlerContext ctx) { + LOG.info("enabling handling of zlib gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("zlibdeflater", new ZlibDecoder(ZlibWrapper.ZLIB)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); + } + + private void enableGzip(final ChannelHandlerContext ctx) { + LOG.info("enabling handling of gzip gelf messages on this connection"); + + final ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addLast("gzipdeflater", new ZlibDecoder(ZlibWrapper.GZIP)); + pipeline.addLast("handler", new SimpleGELFHandler()); + pipeline.remove(this); + } +}