Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gelf compression #14

Merged
merged 2 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions conf/janus.eventhandler.gelfevh.jcfg.sample
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This configures the gelf event handler. Appending necessary headers
# and sending messages via TCP or UDP (maybe quic soon :D)
# This configures the GELF event handler. Appending necessary headers
# and sending messages via TCP or UDP

general: {
enabled = false # By default the module is not enabled
enabled = false # By default the module is not enabled
events = "all"
# Comma separated list of the events mask you're interested
# in. Valid values are none, sessions, handles, jsep, webrtc,
Expand All @@ -11,11 +11,11 @@ general: {

backend = "your.gralog.server"
port = "12201"
protocol = "tcp" # tcp or udp transport type
max_message_len = 1024 # Note that we add 12 bytes of headers + standard UDP headers (8 bytes)
# when calculating packet size based on MTU
protocol = "tcp" # tcp or udp transport type
max_message_len = 1024 # Note that we add 12 bytes of headers + standard UDP headers (8 bytes)
# when calculating packet size based on MTU

#compress = true # Optionally, only for UDP transport, JSON messages can be compressed using zlib
#compression = 9 # In case, you can specify the compression factor, where 1 is
#compress = true # Optionally, only for UDP transport, JSON messages can be compressed using zlib
#compression = 9 # In case, you can specify the compression factor, where 1 is
# the fastest (low compression), and 9 gives the best compression
}
74 changes: 40 additions & 34 deletions events/janus_gelfevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
* \copyright GNU General Public License v3
* \brief Janus GelfEventHandler plugin
* \details This is a GELF event handler plugin for Janus, which is supposed
* to send json events to GELF (Graylog logger). Necessary headers are prepended.
* For sending, you can use TCP which is not recomanded in case there will be
* to send json events to GELF
* (Graylog logger https://docs.graylog.org/en/3.2/pages/gelf.html).
* Necessary headers are prepended.
* For sending, you can use TCP which is not recommended in case there will be
* a lot of messages. There is also UDP support, but you need to limit the payload
* size with max_message_len + remember to leave space for 12 bytes for special
* size with max_message_len + remember to leave room for 12 bytes for special
* headers. UDP messages will be chunked automatically.
* There is also compression available for UDP protocol, to save network bandwith
* for cpu sacrifice. This is not available for TCP due to GELF limitations
* while using a bit more CPU. This is not available for TCP due to GELF limitations
*
* \ingroup eventhandlers
* \ref eventhandlers
Expand All @@ -26,7 +28,6 @@
#include "../events.h"
#include <netdb.h>
#include <errno.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
Expand All @@ -37,7 +38,7 @@
#define JANUS_GELFEVH_VERSION_STRING "0.0.1"
#define JANUS_GELFEVH_DESCRIPTION "This is event handler plugin for Janus, which forwards events via TCP/UDP to GELF server."
#define JANUS_GELFEVH_NAME "JANUS GelfEventHandler plugin"
#define JANUS_GELFEVH_AUTHOR "Meetecho s.r.l."
#define JANUS_GELFEVH_AUTHOR "Mirko Brankovic <[email protected]>"
#define JANUS_GELFEVH_PACKAGE "janus.eventhandler.gelfevh"

#define MAX_GELF_CHUNKS 128
Expand Down Expand Up @@ -107,20 +108,26 @@ static void janus_gelfevh_event_free(json_t *event) {
/* GELF backend to send the events to */
static char *backend = NULL;
static char *port = NULL;

typedef enum janus_gelfevh_socket_type {
TCP = 1,
UDP = 2
} janus_gelfevh_socket_type;

static int max_gelf_msg_len = 500;
static int janus_gelfevh_socket_type = 1;
static int sockfd;
/* Set TCP as Default transport */
static janus_gelfevh_socket_type transport = TCP;

/* Parameter validation (for tweaking via Admin API) */
static struct janus_json_parameter request_parameters[] = {
{"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
};
{"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}};
static struct janus_json_parameter tweak_parameters[] = {
{"events", JSON_STRING, 0},
{"backend", JSON_STRING, 0},
{"port", JSON_STRING, 0},
{"max_gelf_msg_len", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"janus_gelfevh_socket_type", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}
{"janus_gelfevh_socket_type", JSON_STRING, 0}
};
/* Error codes (for the tweaking via Admin API */
#define JANUS_GELFEVH_ERROR_INVALID_REQUEST 411
Expand Down Expand Up @@ -152,8 +159,8 @@ static int janus_gelfevh_connect(void) {
struct sockaddr_in servaddr;

if(getaddrinfo(backend, NULL, NULL, &res) != 0 ||
janus_network_address_from_sockaddr(res->ai_addr, &addr) != 0 ||
janus_network_address_to_string_buffer(&addr, &addr_buf) != 0) {
janus_network_address_from_sockaddr(res->ai_addr, &addr) != 0 ||
janus_network_address_to_string_buffer(&addr, &addr_buf) != 0) {
if(res)
freeaddrinfo(res);
JANUS_LOG(LOG_ERR, "Could not resolve address (%s)...\n", backend);
Expand All @@ -162,7 +169,7 @@ static int janus_gelfevh_connect(void) {
const char *host = g_strdup(janus_network_address_string_from_buffer(&addr_buf));
freeaddrinfo(res);

if((sockfd = socket(AF_INET, janus_gelfevh_socket_type, 0)) < 0 ) {
if((sockfd = socket(AF_INET, transport, 0)) < 0 ) {
JANUS_LOG(LOG_ERR, "Socket creation failed: %s\n", strerror(errno));
return -1;
}
Expand All @@ -186,11 +193,11 @@ static int janus_gelfevh_send(char *message) {
JANUS_LOG(LOG_WARN, "Message is NULL, not sending to GELF!\n");
return -1;
}
/* TCP */
if(janus_gelfevh_socket_type == 1) {
if(transport == TCP) {
/* TCP */
unsigned int out_bytes = 0;
while (out_bytes < strlen(message)+1) {
int n = write(sockfd, message, strlen(message) + 1);
int n = send(sockfd, message, strlen(message) + 1, 0);
if (n < 0){
JANUS_LOG(LOG_WARN, "Unable to send message: %s\n", strerror(errno));
close(sockfd);
Expand All @@ -203,9 +210,9 @@ static int janus_gelfevh_send(char *message) {
out_bytes += n;
}
}
/* UDP chunking with headers */

} else {
/* Check if we need to compress the data */
/* UDP chunking with headers. Check if we need to compress the data */
int len = strlen(message);
char *buf = message;
if(compress) {
Expand All @@ -215,7 +222,7 @@ static int janus_gelfevh_send(char *message) {
message, strlen(message),
compressed_text, sizeof(compressed_text));
if(compressed_len == 0) {
JANUS_LOG(LOG_ERR, "Failed to compress event (%zu bytes)...\n", strlen(message));
JANUS_LOG(LOG_WARN, "Failed to compress event (%zu bytes). Sending message uncompressed\n", strlen(message));
/* Sending message uncompressed */
} else {
len = compressed_len;
Expand All @@ -225,14 +232,14 @@ static int janus_gelfevh_send(char *message) {

int total = len / max_gelf_msg_len + 1;
if (total > MAX_GELF_CHUNKS) {
JANUS_LOG(LOG_WARN, "GELF allows %d number of chunks, try increasing max_gelf_msg_len\n", MAX_GELF_CHUNKS);
JANUS_LOG(LOG_ERR, "Event not sent! GELF allows %d number of chunks, try increasing max_gelf_msg_len\n", MAX_GELF_CHUNKS);
return -1;
}
/* do we need to chunk the message */
if(total == 1) {
int n = write(sockfd, buf, len);
int n = send(sockfd, buf, len, 0);
if(n < 0) {
JANUS_LOG(LOG_WARN, "Sending UDP message failed: %s \n", strerror(errno));
JANUS_LOG(LOG_ERR, "Sending UDP message failed, dropping event: %s \n", strerror(errno));
return -1;
}
return 1;
Expand All @@ -241,7 +248,7 @@ static int janus_gelfevh_send(char *message) {
char *rnd = randstring(8);
for (int i = 0; i < total; i++) {
int bytesToSend = offset + max_gelf_msg_len < len ? max_gelf_msg_len : len - offset;
/* prepend the necessary headers (imitate TCP) */
/* Prepend the necessary headers (imitate TCP) */
char chunk[bytesToSend + 12];
chunk[0] = 0x1e;
chunk[1] = 0x0f;
Expand All @@ -251,7 +258,7 @@ static int janus_gelfevh_send(char *message) {
char *head = chunk;
memcpy(head+12, buf, bytesToSend);
buf += bytesToSend;
int n = write(sockfd, head, bytesToSend + 12);
int n = send(sockfd, head, bytesToSend + 12, 0);
if(n < 0) {
JANUS_LOG(LOG_WARN, "Sending UDP message failed: %s \n", strerror(errno));
return -1;
Expand Down Expand Up @@ -314,13 +321,16 @@ int janus_gelfevh_init(const char *config_path) {
item = janus_config_get(config, config_general, janus_config_type_item, "protocol");
if (item && item->value) {
if (strcasecmp(item->value, "udp") == 0){
janus_gelfevh_socket_type = 2;
transport = UDP;
}
}
item = janus_config_get(config, config_general, janus_config_type_item, "max_message_len");
if (item && item->value) {
int mml = atoi(item->value);
max_gelf_msg_len = mml;
if(atoi(item->value) > 0) {
JANUS_LOG(LOG_WARN, "Missing or invalid max_message_len, using default: %d\n", max_gelf_msg_len);
} else {
max_gelf_msg_len = atoi(item->value);
}
}
/* Which events should we subscribe to? */
item = janus_config_get(config, config_general, janus_config_type_item, "events");
Expand Down Expand Up @@ -489,7 +499,7 @@ json_t *janus_gelfevh_handle_request(json_t *request) {
if (json_object_get(request, "max_message_len"))
max_gelf_msg_len = json_integer_value(json_object_get(request, "max_message_len"));
if (strcasecmp(json_string_value(json_object_get(request, "protocol")), "udp") == 0){
janus_gelfevh_socket_type = 2;
transport = UDP;
}
if(!req_backend || !req_port) {
/* Invalid backend address or port */
Expand Down Expand Up @@ -556,12 +566,8 @@ static void *janus_gelfevh_handler(void *data) {
if(microtimestamp && json_is_integer(microtimestamp)) {
double created_timestamp = (double)json_integer_value(microtimestamp) / 1000000;
json_object_set(output, "timestamp", json_real(created_timestamp));
}
else {
struct timeval t;
gettimeofday(&t, NULL);
double micro_timestamp = (double)(1000000 * t.tv_sec + t.tv_usec) / 1000000;
json_object_set(output, "timestamp", json_real(micro_timestamp));
} else {
json_object_set(output, "timestamp", json_real(janus_get_real_time()));
}
json_object_set(output, "host", json_object_get(event, "emitter"));
json_object_set(output, "version", json_string("1.1"));
Expand Down