Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an Apache Kafka output module #162

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ AC_ARG_ENABLE(man-pages,
[ --disable-man-pages Disable building the man pages (default: on)]
,,enable_man_pages="yes")

AC_ARG_ENABLE(kafka,
[ --disable-kafka Disable Kafka support (default: auto)]
,,enable_kafka="auto")

AC_ARG_WITH(librdkafka,
AC_HELP_STRING([--with-librdkafka=DIR],
[use librdkafka library from (prefix) directory DIR]),,)
patheval()
{
OLD=$1
Expand Down Expand Up @@ -982,6 +989,40 @@ if test "x$enable_redis" != "xno" && test "x$with_redis" != "no"; then
enable_redis=$hiredis
fi

dnl ***************************************************************************
dnl librdkafka headers/libraries
dnl ***************************************************************************
if test "x$enable_kafka" != "xno" && test "x$with_kafka" != "xno"; then
rdkafka="yes"
if test "x$with_librdkafka" != "xyes"; then
CFLAGS_SAVE="$CFLAGS"
LDFLAGS_SAVE="$LDFLAGS"
CFLAGS="$CFLAGS -I$with_librdkafka/include"
LDFLAGS="$LDFLAGS -L$with_librdkafka/lib"
AC_CHECK_HEADER(librdkafka/rdkafka.h, [RDKAFKA_CFLAGS="-I$with_librdkafka/include"
RDKAFKA_LIBS="-L$with_librdkafka/lib -lrdkafka"], [rdkafka=no])
CFLAGS="$CFLAGS_SAVE"
LDFLAGS="$LDFLAGS_SAVE"
else
CFLAGS_SAVE="$CFLAGS"
LDFLAGS_SAVE="$LDFLAGS"
CFLAGS="$CFLAGS -I$with_librdkafka/include"
LDFLAGS="$LDFLAGS -L$with_librdkafka/lib"
AC_CHECK_HEADER(librdkafka/rdkafka.h, [RDKAFKA_CFLAGS="-I$with_librdkafka/include"
RDKAFKA_LIBS="-L$with_librdkafka/lib -lrdkafka"], [rdkafka=no])
CFLAGS="$CFLAGS_SAVE"
LDFLAGS="$LDFLAGS_SAVE"
fi

if test "x$enable_kafka" = "xyes" && test "x$rdkafka" = "xno"; then
AC_MSG_ERROR(librdkafka not found)
fi

enable_kafka=$rdkafka
echo "libs: $RDKAFKA_LIBS"
fi


dnl ***************************************************************************
dnl rabbitmq-c headers/libraries
dnl ***************************************************************************
Expand Down Expand Up @@ -1278,6 +1319,7 @@ AM_CONDITIONAL(JSON_INTERNAL, [test "x$JSON_SUBDIRS" != "x"])
AM_CONDITIONAL(LIBMONGO_INTERNAL, [test "x$LIBMONGO_SUBDIRS" != "x"])
AM_CONDITIONAL(LIBRABBITMQ_INTERNAL, [test "x$LIBRABBITMQ_SUBDIRS" != "x"])
AM_CONDITIONAL(ENABLE_MAN_PAGES, [test "x$enable_man_pages" = "xyes"])
AM_CONDITIONAL(ENABLE_KAFKA, [test "x$enable_kafka" = "xyes"])

# substitution into manual pages
expanded_sysconfdir=[`patheval $sysconfdir | sed -e 's/-/\\\\-/g'`]
Expand Down Expand Up @@ -1377,3 +1419,4 @@ echo " AMQP destination (module) : ${enable_amqp:=no}"
echo " STOMP destination (module) : ${enable_stomp:=no}"
echo " GEOIP support (module) : ${enable_geoip:=no}"
echo " Redis support (module) : ${enable_redis:=no}"
echo " Kafka support (module) : ${enable_kafka:=no}"
7 changes: 4 additions & 3 deletions lib/stats/stats-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ _get_module_name(gint source)
"stomp",
"redis",
"snmp",
"kafka",
};
return module_names[source & SCS_SOURCE_MASK];
}
Expand Down Expand Up @@ -119,7 +120,7 @@ stats_cluster_get_component_name(StatsCluster *self, gchar *buf, gsize buf_len)
}
else
{
g_snprintf(buf, buf_len, "%s%s",
g_snprintf(buf, buf_len, "%s%s",
_get_component_prefix(self->component),
_get_module_name(self->component));
return buf;
Expand Down Expand Up @@ -164,7 +165,7 @@ StatsCluster *
stats_cluster_new(gint component, const gchar *id, const gchar *instance)
{
StatsCluster *self = g_new0(StatsCluster, 1);

self->component = component;
self->id = g_strdup(id ? : "");
self->instance = g_strdup(instance ? : "");
Expand All @@ -174,7 +175,7 @@ stats_cluster_new(gint component, const gchar *id, const gchar *instance)

void
stats_cluster_free(StatsCluster *self)
{
{
g_free(self->id);
g_free(self->instance);
g_free(self);
Expand Down
3 changes: 2 additions & 1 deletion lib/stats/stats-cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ enum
/* direction bits, used to distinguish between source/destination drivers */
SCS_SOURCE = 0x0100,
SCS_DESTINATION = 0x0200,

/* drivers, this should be registered dynamically */
SCS_FILE = 1,
SCS_PIPE = 2,
Expand Down Expand Up @@ -76,6 +76,7 @@ enum
SCS_STOMP = 30,
SCS_REDIS = 31,
SCS_SNMP = 32,
SCS_KAFKA = 33,
SCS_MAX,
SCS_SOURCE_MASK = 0xff
};
Expand Down
3 changes: 2 additions & 1 deletion modules/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ include modules/tfgeoip/Makefile.am
include modules/afstomp/Makefile.am
include modules/redis/Makefile.am
include modules/pseudofile/Makefile.am
include modules/kafka/Makefile.am

SYSLOG_NG_MODULES = \
mod-afsocket mod-afstreams mod-affile mod-afprog \
mod-usertty mod-amqp mod-mongodb mod-smtp mod-json \
mod-syslogformat mod-linux-kmsg mod-pacctformat \
mod-confgen mod-system-source mod-csvparser mod-dbparser \
mod-basicfuncs mod-cryptofuncs mod-geoip mod-afstomp \
mod-redis mod-pseudofile
mod-redis mod-pseudofile mod-kafka

modules modules/: ${SYSLOG_NG_MODULES}

Expand Down
36 changes: 36 additions & 0 deletions modules/kafka/Makefile.am
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
if ENABLE_KAFKA
module_LTLIBRARIES += \
modules/kafka/libkafka.la

modules_kafka_libkafka_la_CFLAGS = \
$(RDKAFKA_CFLAGS) \
-I$(top_srcdir)/modules/kafka \
-I$(top_builddir)/modules/kafka
modules_kafka_libkafka_la_SOURCES = \
modules/kafka/kafka-grammar.y \
modules/kafka/kafka.c \
modules/kafka/kafka.h \
modules/kafka/kafka-parser.h \
modules/kafka/kafka-parser.c

modules_kafka_libkafka_la_LIBADD = \
-lrdkafka $(RDKAFKA_LIBS) $(MODULE_DEPS_LIBS)
modules_kafka_libkafka_la_LDFLAGS = \
$(MODULE_LDFLAGS)
modules_kafka_libkafka_la_DEPENDENCIES = \
$(MODULE_DEPS_LIBS)

modules/kafka modules/kafka/ mod-kafka: \
modules/kafka/libkafka.la
else
modules/kafka modules/kafka/ mod-kafka:
endif

BUILT_SOURCES += \
modules/kafka/kafka-grammar.y \
modules/kafka/kafka-grammar.c \
modules/kafka/kafka-grammar.h
EXTRA_DIST += \
modules/kafka/kafka-grammar.ym

.PHONY: modules/kafka/ mod-kafka
110 changes: 110 additions & 0 deletions modules/kafka/crc32.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* COPYRIGHT (C) 1986 Gary S. Brown. You may use this program, or
* code or tables extracted from it, as desired without restriction.
*
* First, the polynomial itself and its table of feedback terms. The
* polynomial is
* X^32+X^26+X^23+X^22+X^16+X^12+X^11+X^10+X^8+X^7+X^5+X^4+X^2+X^1+X^0
*
* Note that we take it "backwards" and put the highest-order term in
* the lowest-order bit. The X^32 term is "implied"; the LSB is the
* X^31 term, etc. The X^0 term (usually shown as "+1") results in
* the MSB being 1
*
* Note that the usual hardware shift register implementation, which
* is what we're using (we're merely optimizing it by doing eight-bit
* chunks at a time) shifts bits into the lowest-order term. In our
* implementation, that means shifting towards the right. Why do we
* do it this way? Because the calculated CRC must be transmitted in
* order from highest-order term to lowest-order term. UARTs transmit
* characters in order from LSB to MSB. By storing the CRC this way
* we hand it to the UART in the order low-byte to high-byte; the UART
* sends each low-bit to hight-bit; and the result is transmission bit
* by bit from highest- to lowest-order term without requiring any bit
* shuffling on our part. Reception works similarly
*
* The feedback terms table consists of 256, 32-bit entries. Notes
*
* The table can be generated at runtime if desired; code to do so
* is shown later. It might not be obvious, but the feedback
* terms simply represent the results of eight shift/xor opera
* tions for all combinations of data and CRC register values
*
* The values must be right-shifted by eight bits by the "updcrc
* logic; the shift must be unsigned (bring in zeroes). On some
* hardware you could probably optimize the shift in assembler by
* using byte-swap instructions
* polynomial $edb88320
*/

#include <sys/types.h>

u_int32_t crc32(const u_char *, size_t);
static unsigned int crc32_tab[] = {
0x00000000L, 0x77073096L, 0xee0e612cL, 0x990951baL, 0x076dc419L,
0x706af48fL, 0xe963a535L, 0x9e6495a3L, 0x0edb8832L, 0x79dcb8a4L,
0xe0d5e91eL, 0x97d2d988L, 0x09b64c2bL, 0x7eb17cbdL, 0xe7b82d07L,
0x90bf1d91L, 0x1db71064L, 0x6ab020f2L, 0xf3b97148L, 0x84be41deL,
0x1adad47dL, 0x6ddde4ebL, 0xf4d4b551L, 0x83d385c7L, 0x136c9856L,
0x646ba8c0L, 0xfd62f97aL, 0x8a65c9ecL, 0x14015c4fL, 0x63066cd9L,
0xfa0f3d63L, 0x8d080df5L, 0x3b6e20c8L, 0x4c69105eL, 0xd56041e4L,
0xa2677172L, 0x3c03e4d1L, 0x4b04d447L, 0xd20d85fdL, 0xa50ab56bL,
0x35b5a8faL, 0x42b2986cL, 0xdbbbc9d6L, 0xacbcf940L, 0x32d86ce3L,
0x45df5c75L, 0xdcd60dcfL, 0xabd13d59L, 0x26d930acL, 0x51de003aL,
0xc8d75180L, 0xbfd06116L, 0x21b4f4b5L, 0x56b3c423L, 0xcfba9599L,
0xb8bda50fL, 0x2802b89eL, 0x5f058808L, 0xc60cd9b2L, 0xb10be924L,
0x2f6f7c87L, 0x58684c11L, 0xc1611dabL, 0xb6662d3dL, 0x76dc4190L,
0x01db7106L, 0x98d220bcL, 0xefd5102aL, 0x71b18589L, 0x06b6b51fL,
0x9fbfe4a5L, 0xe8b8d433L, 0x7807c9a2L, 0x0f00f934L, 0x9609a88eL,
0xe10e9818L, 0x7f6a0dbbL, 0x086d3d2dL, 0x91646c97L, 0xe6635c01L,
0x6b6b51f4L, 0x1c6c6162L, 0x856530d8L, 0xf262004eL, 0x6c0695edL,
0x1b01a57bL, 0x8208f4c1L, 0xf50fc457L, 0x65b0d9c6L, 0x12b7e950L,
0x8bbeb8eaL, 0xfcb9887cL, 0x62dd1ddfL, 0x15da2d49L, 0x8cd37cf3L,
0xfbd44c65L, 0x4db26158L, 0x3ab551ceL, 0xa3bc0074L, 0xd4bb30e2L,
0x4adfa541L, 0x3dd895d7L, 0xa4d1c46dL, 0xd3d6f4fbL, 0x4369e96aL,
0x346ed9fcL, 0xad678846L, 0xda60b8d0L, 0x44042d73L, 0x33031de5L,
0xaa0a4c5fL, 0xdd0d7cc9L, 0x5005713cL, 0x270241aaL, 0xbe0b1010L,
0xc90c2086L, 0x5768b525L, 0x206f85b3L, 0xb966d409L, 0xce61e49fL,
0x5edef90eL, 0x29d9c998L, 0xb0d09822L, 0xc7d7a8b4L, 0x59b33d17L,
0x2eb40d81L, 0xb7bd5c3bL, 0xc0ba6cadL, 0xedb88320L, 0x9abfb3b6L,
0x03b6e20cL, 0x74b1d29aL, 0xead54739L, 0x9dd277afL, 0x04db2615L,
0x73dc1683L, 0xe3630b12L, 0x94643b84L, 0x0d6d6a3eL, 0x7a6a5aa8L,
0xe40ecf0bL, 0x9309ff9dL, 0x0a00ae27L, 0x7d079eb1L, 0xf00f9344L,
0x8708a3d2L, 0x1e01f268L, 0x6906c2feL, 0xf762575dL, 0x806567cbL,
0x196c3671L, 0x6e6b06e7L, 0xfed41b76L, 0x89d32be0L, 0x10da7a5aL,
0x67dd4accL, 0xf9b9df6fL, 0x8ebeeff9L, 0x17b7be43L, 0x60b08ed5L,
0xd6d6a3e8L, 0xa1d1937eL, 0x38d8c2c4L, 0x4fdff252L, 0xd1bb67f1L,
0xa6bc5767L, 0x3fb506ddL, 0x48b2364bL, 0xd80d2bdaL, 0xaf0a1b4cL,
0x36034af6L, 0x41047a60L, 0xdf60efc3L, 0xa867df55L, 0x316e8eefL,
0x4669be79L, 0xcb61b38cL, 0xbc66831aL, 0x256fd2a0L, 0x5268e236L,
0xcc0c7795L, 0xbb0b4703L, 0x220216b9L, 0x5505262fL, 0xc5ba3bbeL,
0xb2bd0b28L, 0x2bb45a92L, 0x5cb36a04L, 0xc2d7ffa7L, 0xb5d0cf31L,
0x2cd99e8bL, 0x5bdeae1dL, 0x9b64c2b0L, 0xec63f226L, 0x756aa39cL,
0x026d930aL, 0x9c0906a9L, 0xeb0e363fL, 0x72076785L, 0x05005713L,
0x95bf4a82L, 0xe2b87a14L, 0x7bb12baeL, 0x0cb61b38L, 0x92d28e9bL,
0xe5d5be0dL, 0x7cdcefb7L, 0x0bdbdf21L, 0x86d3d2d4L, 0xf1d4e242L,
0x68ddb3f8L, 0x1fda836eL, 0x81be16cdL, 0xf6b9265bL, 0x6fb077e1L,
0x18b74777L, 0x88085ae6L, 0xff0f6a70L, 0x66063bcaL, 0x11010b5cL,
0x8f659effL, 0xf862ae69L, 0x616bffd3L, 0x166ccf45L, 0xa00ae278L,
0xd70dd2eeL, 0x4e048354L, 0x3903b3c2L, 0xa7672661L, 0xd06016f7L,
0x4969474dL, 0x3e6e77dbL, 0xaed16a4aL, 0xd9d65adcL, 0x40df0b66L,
0x37d83bf0L, 0xa9bcae53L, 0xdebb9ec5L, 0x47b2cf7fL, 0x30b5ffe9L,
0xbdbdf21cL, 0xcabac28aL, 0x53b39330L, 0x24b4a3a6L, 0xbad03605L,
0xcdd70693L, 0x54de5729L, 0x23d967bfL, 0xb3667a2eL, 0xc4614ab8L,
0x5d681b02L, 0x2a6f2b94L, 0xb40bbe37L, 0xc30c8ea1L, 0x5a05df1bL,
0x2d02ef8dL
};

/* Return a 32-bit CRC of the contents of the buffer. */

u_int32_t
kafka_crc32(const u_char *s, size_t len)
{
size_t i;
u_int32_t ret;

ret = 0;
for (i = 0; i < len; i++)
ret = crc32_tab[(ret ^ s[i]) & 0xff] ^ (ret >> 8);
return ret;
}
Loading