diff --git a/misc/graylog2.conf b/misc/graylog2.conf
index f4aee9302c97..295c0bb1682b 100644
--- a/misc/graylog2.conf
+++ b/misc/graylog2.conf
@@ -47,6 +47,8 @@ mongodb_threads_allowed_to_block_multiplier = 5
use_gelf = true
gelf_listen_address = 0.0.0.0
gelf_listen_port = 12201
+use_gelf_tcp = false
+# gelf_http_listen_port = 12202
# Drools Rule File (Use to rewrite incoming log messages)
# rules_file = /etc/graylog2.d/rules/graylog2.drl
diff --git a/pom.xml b/pom.xml
index 81ee5c287b79..49c352d2290c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,11 @@
slf4j-log4j12
1.6.1
+
+ org.jboss.netty
+ netty
+ 3.2.7.Final
+
diff --git a/src/main/java/org/graylog2/Configuration.java b/src/main/java/org/graylog2/Configuration.java
index eb3d71b7b078..53c5945beac0 100644
--- a/src/main/java/org/graylog2/Configuration.java
+++ b/src/main/java/org/graylog2/Configuration.java
@@ -20,6 +20,15 @@
package org.graylog2;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
+import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException;
+
import com.github.joschi.jadconfig.Parameter;
import com.github.joschi.jadconfig.ValidationException;
import com.github.joschi.jadconfig.ValidatorMethod;
@@ -28,14 +37,6 @@
import com.github.joschi.jadconfig.validators.PositiveIntegerValidator;
import com.github.joschi.jadconfig.validators.PositiveLongValidator;
import com.mongodb.ServerAddress;
-import org.apache.log4j.Logger;
-import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
-import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException;
-
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
/**
* Helper class to hold configuration of Graylog2
@@ -113,6 +114,12 @@ public class Configuration {
@Parameter(value = "gelf_listen_port", required = true, validator = InetPortValidator.class)
private int gelfListenPort = 12201;
+ @Parameter(value = "use_gelf_tcp")
+ private boolean useGelfTcp = false;
+
+ @Parameter(value = "gelf_http_listen_port")
+ private int gelfHttpListenPort = 0;
+
@Parameter("amqp_enabled")
private boolean amqpEnabled = false;
@@ -231,6 +238,14 @@ public int getGelfListenPort() {
return gelfListenPort;
}
+ public boolean isUseGelfTcp() {
+ return useGelfTcp;
+ }
+
+ public int getGelfHttpListenPort() {
+ return gelfHttpListenPort;
+ }
+
public boolean isAmqpEnabled() {
return amqpEnabled;
}
diff --git a/src/main/java/org/graylog2/Main.java b/src/main/java/org/graylog2/Main.java
index 232f7295e056..79b43e4c739c 100644
--- a/src/main/java/org/graylog2/Main.java
+++ b/src/main/java/org/graylog2/Main.java
@@ -20,11 +20,15 @@
package org.graylog2;
-import com.beust.jcommander.JCommander;
-import com.github.joschi.jadconfig.JadConfig;
-import com.github.joschi.jadconfig.RepositoryException;
-import com.github.joschi.jadconfig.ValidationException;
-import com.github.joschi.jadconfig.repositories.PropertiesRepository;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -35,7 +39,7 @@
import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
import org.graylog2.messagehandlers.amqp.AMQPSubscriberThread;
import org.graylog2.messagehandlers.gelf.ChunkedGELFClientManager;
-import org.graylog2.messagehandlers.gelf.GELFMainThread;
+import org.graylog2.messagehandlers.gelf.GELFServer;
import org.graylog2.messagehandlers.syslog.SyslogServerThread;
import org.graylog2.messagequeue.MessageQueue;
import org.graylog2.messagequeue.MessageQueueFlusher;
@@ -46,14 +50,11 @@
import org.graylog2.periodical.MessageRetentionThread;
import org.graylog2.periodical.ServerValueWriterThread;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import com.beust.jcommander.JCommander;
+import com.github.joschi.jadconfig.JadConfig;
+import com.github.joschi.jadconfig.RepositoryException;
+import com.github.joschi.jadconfig.ValidationException;
+import com.github.joschi.jadconfig.repositories.PropertiesRepository;
/**
* Main class of Graylog2.
@@ -168,7 +169,10 @@ public static void main(String[] args) {
// Start GELF threads
if (configuration.isUseGELF()) {
- initializeGELFThreads(configuration.getGelfListenAddress(), configuration.getGelfListenPort(), scheduler);
+ initializeGELFThreads(configuration.getGelfListenAddress(),
+ configuration.getGelfListenPort(),
+ configuration.isUseGelfTcp(),
+ configuration.getGelfHttpListenPort(), scheduler);
}
// Initialize AMQP Broker if enabled
@@ -231,13 +235,19 @@ private static void initializeMessageRetentionThread(ScheduledExecutorService sc
LOG.info("Retention time management active.");
}
- private static void initializeGELFThreads(String gelfAddress, int gelfPort, ScheduledExecutorService scheduler) {
- GELFMainThread gelfThread = new GELFMainThread(new InetSocketAddress(gelfAddress, gelfPort));
- gelfThread.start();
+ private static void initializeGELFThreads(String gelfAddress, int gelfPort, boolean useTcp, int httpListenPort, ScheduledExecutorService scheduler) {
- scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread(ChunkedGELFClientManager.getInstance()), ChunkedGELFClientManagerThread.INITIAL_DELAY, ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS);
+ if (GELFServer.create(new InetSocketAddress(gelfAddress, gelfPort), useTcp, httpListenPort)) {
+ scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread(
+ ChunkedGELFClientManager.getInstance()),
+ ChunkedGELFClientManagerThread.INITIAL_DELAY,
+ ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS);
+ LOG.info("GELF threads started");
+ } else {
+ LOG.fatal("GELF threads could not be started.");
+ System.exit(1);
+ }
- LOG.info("GELF threads started");
}
private static void initializeSyslogServer(String syslogProtocol, int syslogPort) {
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java
index 69ba003d9578..f342eb53eaee 100644
--- a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFClientHandler.java
@@ -20,6 +20,10 @@
package org.graylog2.messagehandlers.gelf;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.util.zip.DataFormatException;
+
import org.apache.log4j.Logger;
import org.graylog2.Tools;
import org.graylog2.blacklists.Blacklist;
@@ -28,10 +32,6 @@
import org.graylog2.messagehandlers.common.MessageCountUpdateHook;
import org.graylog2.messagehandlers.common.MessageParserHook;
import org.graylog2.messagehandlers.common.ReceiveHookManager;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.util.zip.DataFormatException;
import org.graylog2.messagequeue.MessageQueue;
/**
@@ -109,6 +109,10 @@ public ChunkedGELFClientHandler(DatagramPacket clientMessage) throws GELFExcepti
}
}
+ public ChunkedGELFClientHandler(byte[] data) throws GELFException, IOException, DataFormatException {
+ this(new DatagramPacket(data, data.length));
+ }
+
private void decompress(byte[] data, String hash) throws InvalidGELFCompressionMethodException, IOException {
// Determine compression type.
int type = GELF.getGELFType(data);
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java
new file mode 100644
index 000000000000..fb027cd76d09
--- /dev/null
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/ChunkedGELFHandler.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2012 Kay Roepke
+ *
+ * This file is part of Graylog2.
+ *
+ * Graylog2 is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Graylog2 is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Graylog2. If not, see .
+ *
+ */
+
+package org.graylog2.messagehandlers.gelf;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+public class ChunkedGELFHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = Logger.getLogger(ChunkedGELFHandler.class);
+
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
+ throws Exception {
+ final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ final byte[] data = new byte[buffer.readableBytes()];
+ buffer.getBytes(0, data);
+ LOG.info("received chunked gelf messages, passing on to handler");
+ new ChunkedGELFClientHandler(data).handle();
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)
+ throws Exception {
+ LOG.error("Could not handle chunked GELF message", e.getCause());
+ e.getChannel().close();
+ }
+
+
+}
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java b/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java
deleted file mode 100644
index d3fefd1e032f..000000000000
--- a/src/main/java/org/graylog2/messagehandlers/gelf/GELFMainThread.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Copyright 2010 Lennart Koopmann
- *
- * This file is part of Graylog2.
- *
- * Graylog2 is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Graylog2 is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Graylog2. If not, see .
- *
- */
-
-package org.graylog2.messagehandlers.gelf;
-
-import org.apache.log4j.Logger;
-
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * GELFMainThread.java: Jun 23, 2010 6:43:21 PM
- *
- * Thread responsible for listening for GELF messages.
- *
- * @author Lennart Koopmann
- */
-public class GELFMainThread extends Thread {
-
- private static final Logger LOG = Logger.getLogger(GELFMainThread.class);
-
- private InetSocketAddress socketAddress;
-
- private ExecutorService threadPool = Executors.newCachedThreadPool();
-
- /**
- * Thread responsible for listening for GELF messages.
- *
- * @param socketAddress The {@link InetSocketAddress} to bind to
- */
- public GELFMainThread(InetSocketAddress socketAddress) {
- this.socketAddress = socketAddress;
- }
-
- /**
- * Run the thread. Runs forever!
- */
- @Override public void run() {
- GELFServer server = new GELFServer();
- if (!server.create(socketAddress)) {
- throw new RuntimeException("Could not start GELF server. Do you have permissions to bind to " + socketAddress + "?");
- }
-
- // Run forever.
- while (true) {
- try {
- // Listen on socket.
- DatagramPacket receivedGelfSentence = server.listen();
-
- // Skip empty sentences.
- if (receivedGelfSentence.getLength() == 0) {
- continue;
- }
-
- // We got a connected client. Start a GELFClientHandlerThread() in our thread pool and wait for next client.
- threadPool.execute(new GELFClientHandlerThread(receivedGelfSentence));
- } catch (Exception e) {
- LOG.error("Skipping GELF client. Error: " + e.getMessage(), e);
- }
- }
- }
-
-}
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java b/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java
index 1a7cf03da18a..3d06353e4051 100644
--- a/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/GELFServer.java
@@ -20,12 +20,25 @@
package org.graylog2.messagehandlers.gelf;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
/**
* GELFThread.java: Jun 23, 2010 6:58:07 PM
@@ -43,44 +56,86 @@ public class GELFServer {
*/
public static final int MAX_PACKET_SIZE = 8192;
- private DatagramSocket serverSocket = null;
-
/**
- * Create the UDP socket.
+ * Create the configured servers to accept GELF messages on: UDP and optionally nul-byte delimited TCP, and HTTP.
*
* @param socketAddress The {@link InetSocketAddress} to bind to
- * @return boolean
+ * @param useTcp true if the TCP server should be started as well
+ * @param httpListenPort if non-zero the port on which to listen to HTTP requests
+ * @return boolean true if all requested servers could be bound, false otherwise
*/
- public boolean create(InetSocketAddress socketAddress) {
+ public static boolean create(InetSocketAddress socketAddress, boolean useTcp, int httpListenPort) {
+ final ExecutorService bossThreadPool = Executors.newCachedThreadPool();
+ final ExecutorService workerThreadPool = Executors.newCachedThreadPool();
+
+ // UDP GELF server
+
+ final ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap(
+ new NioDatagramChannelFactory(workerThreadPool)
+ );
+ udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(new UdpGELFHandler());
+ }
+ });
+
+ // TCP GELF server
+ ServerBootstrap tcpBootstrap = null;
+ if (useTcp) {
+ tcpBootstrap = new ServerBootstrap(
+ new NioServerSocketChannelFactory(
+ bossThreadPool, workerThreadPool)
+ );
+ tcpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("framer", new DelimiterBasedFrameDecoder(2 * 1024 * 1024,
+ Delimiters.nulDelimiter()));
+ p.addLast("handler", new TcpGELFHandler());
+ return p;
+ }
+ });
+ }
+
+ // HTTP GELF server
+ ServerBootstrap httpBootstrap = null;
+ if (httpListenPort != 0) {
+ httpBootstrap = new ServerBootstrap(
+ new NioServerSocketChannelFactory(bossThreadPool, workerThreadPool)
+ );
+ httpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("deflater", new HttpContentCompressor());
+ pipeline.addLast("handler", new HttpGELFHandler());
+
+ return pipeline;
+ }
+ });
+ }
try {
- this.serverSocket = new DatagramSocket(socketAddress);
- } catch(IOException e) {
- LOG.fatal("Could not create ServerSocket in GELFServer::create(): " + e.getMessage(), e);
+ udpBootstrap.bind(socketAddress);
+ LOG.info("Started UDP GELF server on " + socketAddress);
+ if (useTcp) {
+ tcpBootstrap.bind(socketAddress);
+ LOG.info("Started TCP GELF server on " + socketAddress);
+ }
+ if (httpListenPort != 0) {
+ httpBootstrap.bind(new InetSocketAddress(socketAddress.getAddress(), httpListenPort));
+ LOG.info("Started HTTP GELF server on " + socketAddress.getAddress() + ":" + httpListenPort);
+ }
+ } catch (ChannelException e) {
+ LOG.fatal("Could not bind GELF server to address " + socketAddress, e);
return false;
}
- LOG.info("Started GELF server on " + socketAddress);
-
return true;
}
- /**
- * Listens on the formerly created (create()) socket and returns
- * unzipped (GZIP) raw message that can be parsed to a GELFMessage.
- *
- * @return Received message
- * @throws IOException
- */
- public DatagramPacket listen() throws IOException {
-
- // Create buffer.
- byte[] receiveData = new byte[MAX_PACKET_SIZE];
- DatagramPacket receivedPacket = new DatagramPacket(receiveData, receiveData.length);
-
- // Reveive and fill buffer.
- serverSocket.receive(receivedPacket);
-
- return receivedPacket;
- }
-
}
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java
new file mode 100644
index 000000000000..9126c648bca0
--- /dev/null
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/HttpGELFHandler.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2012 Kay Roepke
+ *
+ * This file is part of Graylog2.
+ *
+ * Graylog2 is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Graylog2 is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Graylog2. If not, see .
+ *
+ */
+
+package org.graylog2.messagehandlers.gelf;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.util.CharsetUtil;
+
+public class HttpGELFHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = Logger.getLogger(HttpGELFHandler.class);
+
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event)
+ throws Exception {
+ final HttpRequest request = (HttpRequest) event.getMessage();
+ final ChannelBuffer content = request.getContent();
+ final String receivedGelfSentence = content.toString(CharsetUtil.UTF_8);
+ LOG.info("received http message: " + receivedGelfSentence);
+ new SimpleGELFClientHandler(receivedGelfSentence).handle();
+
+ // Build the response object.
+ final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED);
+ response.setContent(ChannelBuffers.copiedBuffer("", CharsetUtil.UTF_8));
+ response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+
+ final ChannelFuture future = event.getChannel().write(response);
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)
+ throws Exception {
+ LOG.error("Exception caught while reading HTTP GELF message", e.getCause());
+ e.getChannel().close();
+ }
+
+}
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java
index 7215003119fa..8ba8263c2598 100644
--- a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFClientHandler.java
@@ -20,6 +20,11 @@
package org.graylog2.messagehandlers.gelf;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.DatagramPacket;
+import java.util.zip.DataFormatException;
+
import org.apache.log4j.Logger;
import org.graylog2.Tools;
import org.graylog2.blacklists.Blacklist;
@@ -28,11 +33,6 @@
import org.graylog2.messagehandlers.common.MessageCountUpdateHook;
import org.graylog2.messagehandlers.common.MessageParserHook;
import org.graylog2.messagehandlers.common.ReceiveHookManager;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.DatagramPacket;
-import java.util.zip.DataFormatException;
import org.graylog2.messagequeue.MessageQueue;
/**
@@ -93,6 +93,7 @@ public SimpleGELFClientHandler(Object clientMessage) throws DataFormatException,
}
}
+
/**
* Handles the client: Decodes JSON, Stores in Indexer, ReceiveHooks
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java
new file mode 100644
index 000000000000..db9c5979e848
--- /dev/null
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/SimpleGELFHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright 2012 Kay Roepke
+ *
+ * This file is part of Graylog2.
+ *
+ * Graylog2 is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Graylog2 is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Graylog2. If not, see .
+ *
+ */
+
+package org.graylog2.messagehandlers.gelf;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+public class SimpleGELFHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = Logger.getLogger(SimpleGELFClientHandler.class);
+
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
+ throws Exception {
+ final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ new SimpleGELFClientHandler(buffer.toString(CharsetUtil.UTF_8)).handle();
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)
+ throws Exception {
+ LOG.error("Could not handle single GELF message", e.getCause());
+ e.getChannel().close();
+ }
+
+
+}
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java
new file mode 100644
index 000000000000..1f0a1aad4929
--- /dev/null
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/TcpGELFHandler.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2012 Kay Roepke
+ *
+ * This file is part of Graylog2.
+ *
+ * Graylog2 is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Graylog2 is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Graylog2. If not, see .
+ *
+ */
+
+package org.graylog2.messagehandlers.gelf;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibWrapper;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class TcpGELFHandler extends FrameDecoder {
+ private static final Logger LOG = Logger.getLogger(TcpGELFHandler.class);
+
+ // TODO refactor, duplicated across a couple of places now.
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buffer) throws Exception {
+ if (buffer.readableBytes() < 2) {
+ return null;
+ }
+ final int magic1 = buffer.getUnsignedByte(buffer.readerIndex());
+ final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1);
+ final byte[] data = new byte[] {(byte) magic1, (byte) magic2};
+
+ switch (GELF.getGELFType(data)) {
+ case GELF.TYPE_GZIP:
+ enableGzip(ctx);
+ break;
+ case GELF.TYPE_ZLIB:
+ enableZlib(ctx);
+ break;
+ default: {
+ // Unknown protocol; discard everything and close the connection.
+ LOG.info("unknown protocol for GELF message, discarding");
+ buffer.skipBytes(buffer.readableBytes());
+ ctx.getChannel().close();
+ return null;
+ }
+ }
+ return buffer.readBytes(buffer.readableBytes());
+ }
+
+ private void enableZlib(final ChannelHandlerContext ctx) {
+ LOG.debug("enabling handling of zlib gelf messages on this connection");
+
+ final ChannelPipeline pipeline = ctx.getPipeline();
+ pipeline.addLast("zlibdeflater", new ZlibDecoder(ZlibWrapper.ZLIB));
+ pipeline.addLast("handler", new SimpleGELFHandler());
+ pipeline.remove(this);
+ }
+
+ private void enableGzip(final ChannelHandlerContext ctx) {
+ LOG.debug("enabling handling of gzip gelf messages on this connection");
+
+ final ChannelPipeline pipeline = ctx.getPipeline();
+ pipeline.addLast("gzipdeflater", new ZlibDecoder(ZlibWrapper.GZIP));
+ pipeline.addLast("handler", new SimpleGELFHandler());
+ pipeline.remove(this);
+ }
+
+}
diff --git a/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java
new file mode 100644
index 000000000000..b50c1d76ce2f
--- /dev/null
+++ b/src/main/java/org/graylog2/messagehandlers/gelf/UdpGELFHandler.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright 2012 Kay Roepke
+ *
+ * This file is part of Graylog2.
+ *
+ * Graylog2 is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Graylog2 is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Graylog2. If not, see .
+ *
+ */
+
+package org.graylog2.messagehandlers.gelf;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibWrapper;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class UdpGELFHandler extends FrameDecoder {
+ private static final Logger LOG = Logger.getLogger(UdpGELFHandler.class);
+
+ @Override
+ protected Object decode(final ChannelHandlerContext ctx, final Channel channel,
+ final ChannelBuffer buffer) throws Exception {
+ if (buffer.readableBytes() < 2) {
+ return null;
+ }
+ final int magic1 = buffer.getUnsignedByte(buffer.readerIndex());
+ final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1);
+ final byte[] data = new byte[] {(byte) magic1, (byte) magic2};
+
+ switch (GELF.getGELFType(data)) {
+ case GELF.TYPE_GZIP:
+ enableGzip(ctx);
+ break;
+ case GELF.TYPE_ZLIB:
+ enableZlib(ctx);
+ break;
+ case GELF.TYPE_CHUNKED:
+ enableChunked(ctx);
+ break;
+ default: {
+ // Unknown protocol; discard everything and close the connection.
+ LOG.info("unknown protocol for GELF message, discarding");
+ buffer.skipBytes(buffer.readableBytes());
+ ctx.getChannel().close();
+ return null;
+ }
+ }
+ return buffer.readBytes(buffer.readableBytes());
+ }
+
+ private void enableChunked(final ChannelHandlerContext ctx) {
+ LOG.info("enabling handling of chunked gelf messages on this connection");
+ final ChannelPipeline pipeline = ctx.getPipeline();
+ pipeline.addLast("chunked", new ChunkedGELFHandler());
+ pipeline.remove(this);
+ }
+
+ private void enableZlib(final ChannelHandlerContext ctx) {
+ LOG.info("enabling handling of zlib gelf messages on this connection");
+
+ final ChannelPipeline pipeline = ctx.getPipeline();
+ pipeline.addLast("zlibdeflater", new ZlibDecoder(ZlibWrapper.ZLIB));
+ pipeline.addLast("handler", new SimpleGELFHandler());
+ pipeline.remove(this);
+ }
+
+ private void enableGzip(final ChannelHandlerContext ctx) {
+ LOG.info("enabling handling of gzip gelf messages on this connection");
+
+ final ChannelPipeline pipeline = ctx.getPipeline();
+ pipeline.addLast("gzipdeflater", new ZlibDecoder(ZlibWrapper.GZIP));
+ pipeline.addLast("handler", new SimpleGELFHandler());
+ pipeline.remove(this);
+ }
+}