Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka feature 3105 v5 #4096

Closed
wants to merge 1 commit into from
Closed

Conversation

CosmosSun
Copy link

@CosmosSun CosmosSun commented Aug 8, 2019

This changeset ensures the kafka output ability for eve.

  • A new output method convenient for data analysis.
  • Configure with --enable-rdkafka option when you need outpout eve with kafka.
  • Update eve kafka output conf doc

Continuation of #4093.

Make sure these boxes are signed before submitting your Pull Request -- thank you.

Link to redmine ticket:
https://redmine.openinfosecfoundation.org/issues/3105
Describe changes:

  • Kafka output method convenient for data analysis
  • Update eve kafka output conf doc

PRScript output (if applicable):

This changeset ensures the kafka output ability for eve.
- A new output method convenient for data analysis.
- Configure with --enable-rdkafka option when you need outpout eve with kafka.
- Update eve kafka output conf doc
@CosmosSun CosmosSun requested review from norg and a team as code owners August 8, 2019 03:34
@CosmosSun CosmosSun closed this Aug 8, 2019
@CosmosSun CosmosSun reopened this Aug 8, 2019
@CosmosSun CosmosSun closed this Aug 8, 2019
@CosmosSun CosmosSun reopened this Aug 8, 2019
@norg
Copy link
Member

norg commented Aug 8, 2019

Doc looks fine to me, a bit more explanation for new users might be helpful

@jryberg
Copy link

jryberg commented Aug 17, 2019

This is really awesome!! Is there anything blocking this merge?

@CosmosSun
Copy link
Author

This is really awesome!! Is there anything blocking this merge?
https://redmine.openinfosecfoundation.org/issues/3105#note-3
maybe i should solved all these possible cases

gethostname(hostname, 1023);
json_ctx->file_ctx->sensor_name = SCStrdup(hostname);
}
if (json_ctx->file_ctx->sensor_name == NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 blanks between sensor_name and ==

int error_code,
void *opaque, void *msg_opaque)
{
rk = rk;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lhs = rhs ... what's this for? L108-L112

rd_kafka_topic_conf_t *topic_conf;
char tmp[16];
char errstr[512];
kafka_ctx = (SCLogKafkaContext*) SCCalloc(1, sizeof(SCLogKafkaContext));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure kafka_ctx is freed on error paths/early exits.

exit(EXIT_FAILURE);
}

conf = rd_kafka_conf_new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you free conf with rd_kafka_topic_conf_destroy on error paths/early exits.

@@ -0,0 +1,203 @@
/* Copyright (C) 2007-2019 Open Information Security Foundation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: change date to just 2019

#include "util-log-kafka.h"
#include "util-logopenfile.h"

#ifdef HAVE_LIBRDKAFKA
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this guard should be moved prior to the first #include statement.

if (!log_ctx->sensor_name) {
char hostname[1024];
gethostname(hostname, 1023);
log_ctx->sensor_name = SCStrdup(hostname);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where will sensor_name be freed if SCConfLogOpenKafka fails?

@victorjulien
Copy link
Member

Please submit a new PR with the requested updated, thanks!

@victorjulien
Copy link
Member

Btw @dbcfd how do you feel about this? Asking because of your https://github.com/dbcfd/surikafka project.

@CosmosSun why do you think this should be a part of Suricata itself instead of using something like https://github.com/dbcfd/surikafka ?

@CosmosSun
Copy link
Author

Btw @dbcfd how do you feel about this? Asking because of your https://github.com/dbcfd/surikafka project.

@CosmosSun why do you think this should be a part of Suricata itself instead of using something like https://github.com/dbcfd/surikafka ?
I have not seen surikafka before, so i add a similar feature.
It may provide a new output type directly.

@dbcfd
Copy link

dbcfd commented Aug 27, 2019

My general feeling is that suricata should focus on using file and UDS for output, to reduce maintenance cost associated with other output types (e.g. kafka and redis). Continuing to add additional output types increases maintenance cost for features that may see limited use, and can be achieved by using a unix pipe like approach, where the output handler is external to suricata. This also allows the output handler to have a different release cycle, to address possible bugs, library upgrades, etc.

I feel even stronger about this with kafka, due to its significant configuration and failure modes. Kafka is more likely to introduce performance impacts than redis, especially if it is configured as a blocking, don't drop events mode with an ack level for broker and replicates.

victorjulien added a commit to victorjulien/suricata that referenced this pull request Feb 3, 2021
Sleep 250 microseconds instead of 100 as running in KVM cause the
old value to use 100% CPU for these threads.

Perf testing suggests no measurable impact for the non-KVM case.

Ticket: OISF#4096
victorjulien added a commit to victorjulien/suricata that referenced this pull request Feb 3, 2021
Sleep 250 microseconds instead of 100 as running in KVM cause the
old value to use 100% CPU for these threads.

Perf testing suggests no measurable impact for the non-KVM case.

Ticket: OISF#4096
victorjulien added a commit to victorjulien/suricata that referenced this pull request Feb 15, 2021
Sleep 250 microseconds instead of 100 as running in KVM cause the
old value to use 100% CPU for these threads.

Perf testing suggests no measurable impact for the non-KVM case.

Ticket: OISF#4096
(cherry picked from commit 17a38f1)
@zhouhanAC
Copy link

You should use rd_kafka_poll after each call to rd_kafka_produce, or call rd_kafka_poll regularly, otherwise the VmRSS of the process will continue to grow, causing memory leaks:

`int LogFileWriteKafka(void *lf_ctx, const char *string, size_t string_len)
{
LogFileCtx *log_ctx = lf_ctx;
SCLogKafkaContext *kafka_ctx = log_ctx->kafka;
int partition = kafka_ctx->partition % (log_ctx->kafka_setup.partitions);

if (rd_kafka_produce(kafka_ctx->rkt, partition,
		RD_KAFKA_MSG_F_COPY,
		/* Payload and length */
		(void *)string, string_len,
		/* Optional key and its length */
		NULL, 0,
		/* Message opaque, provided in
		 * delivery report callback as
		 * msg_opaque. */
		NULL) == -1) 
{
	SCLogError(SC_ERR_KAFKA,
			"%% Failed to produce to topic %s "
			"partition %i: %s\n",
			log_ctx->kafka_setup.topic_name, partition,
			rd_kafka_err2str(
				rd_kafka_errno2err(errno)));
	/* Poll to handle delivery reports */
	rd_kafka_poll(kafka_ctx->rk, 0);
    return -1;
}
kafka_ctx->partition++;

rd_kafka_poll(kafka_ctx->rk, 0 /*non-blocking*/);
return 0;

}`

@victorjulien
Copy link
Member

@zhouhanAC please report bugs in our issue tracker instead of very old and closed PRs, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

7 participants