diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 9be3970b1c41..8f06398eb0c3 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -18,9 +18,6 @@ package kafka import ( - "errors" - "time" - "github.com/Shopify/sarama" "github.com/elastic/beats/v7/libbeat/beat" @@ -32,20 +29,9 @@ import ( ) const ( - defaultWaitRetry = 1 * time.Second - - // NOTE: maxWaitRetry has no effect on mode, as logstash client currently does - // not return ErrTempBulkFailure - defaultMaxWaitRetry = 60 * time.Second - logSelector = "kafka" ) -var ( - errNoTopicSet = errors.New("No topic configured") - errNoHosts = errors.New("No hosts configured") -) - func init() { sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)} diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 0cc751d99b9e..2be42f639e79 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -45,8 +45,9 @@ import ( ) const ( - kafkaDefaultHost = "localhost" - kafkaDefaultPort = "9092" + kafkaDefaultHost = "kafka" + kafkaDefaultPort = "9092" + kafkaDefaultSASLPort = "9093" ) type eventInfo struct { @@ -183,6 +184,37 @@ func TestKafkaPublish(t *testing.T) { "type": "log", }), }, + { + "publish single event to test topic", + map[string]interface{}{}, + testTopic, + single(common.MapStr{ + "host": "test-host", + "message": id, + }), + }, + { + // Initially I tried rerunning all tests over SASL/SCRAM, but + // that added a full 30sec to the test. Instead most tests run + // in plaintext, and individual tests can switch to SCRAM + // by inserting the config in this example: + "publish single event to test topic over SASL/SCRAM", + map[string]interface{}{ + "hosts": []string{getTestSASLKafkaHost()}, + "protocol": "https", + "sasl.mechanism": "SCRAM-SHA-512", + "ssl.certificate_authorities": []string{ + "../../../testing/environments/docker/kafka/certs/ca-cert", + }, + "username": "beats", + "password": "KafkaTest", + }, + testTopic, + single(common.MapStr{ + "host": "test-host", + "message": id, + }), + }, } defaultConfig := map[string]interface{}{ @@ -322,6 +354,13 @@ func getTestKafkaHost() string { ) } +func getTestSASLKafkaHost() string { + return fmt.Sprintf("%v:%v", + getenv("KAFKA_HOST", kafkaDefaultHost), + getenv("KAFKA_SASL_PORT", kafkaDefaultSASLPort), + ) +} + func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { cfg, err := common.NewConfigFrom(in) if err != nil { diff --git a/testing/environments/docker/kafka/Dockerfile b/testing/environments/docker/kafka/Dockerfile index ff38db49e393..1a5e58836bc3 100644 --- a/testing/environments/docker/kafka/Dockerfile +++ b/testing/environments/docker/kafka/Dockerfile @@ -18,8 +18,11 @@ RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && \ ADD run.sh /run.sh ADD healthcheck.sh /healthcheck.sh +ADD certs/broker.keystore.jks /broker.keystore.jks +ADD certs/client.truststore.jks /broker.truststore.jks EXPOSE 9092 +EXPOSE 9093 EXPOSE 2181 # healthcheck.sh tries to create and delete an empty kafka topic (the topic diff --git a/testing/environments/docker/kafka/README.md b/testing/environments/docker/kafka/README.md new file mode 100644 index 000000000000..6a7306e2423e --- /dev/null +++ b/testing/environments/docker/kafka/README.md @@ -0,0 +1,35 @@ +# Kafka test container + +This Docker container provides an environment for testing with Kafka. It exposes two ports to the host system, `9092` for `PLAINTEXT` and `9093` for `SASL/SSL` with username `beats` and password `KafkaTest`. + +## Certificates + +The test environment uses a self-signed SSL certificate in the broker. To connect, clients will need to set `certs/client.truststore.jks` as their trust store. + +The files in the `certs` directory were generated with these commands: + +```sh +# create the broker's key +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -validity 5000 -keyalg RSA -genkey + +What is your first and last name? + [Unknown]: kafka + ... + +# create a new certificate authority +openssl req -new -x509 -keyout ca-key -out ca-cert -days 5000 + +# add the CA to the kafka client's trust store +keytool -keystore client.truststore.jks -storepass KafkaTest -alias CARoot -keyalg RSA -import -file ca-cert + +# export the server certificate +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -certreq -file broker-cert + +# sign it with the CA +openssl x509 -req -CA ca-cert -CAkey ca-key -in broker-cert -out broker-cert-signed -days 5000 -CAcreateserial -passin pass:KafkaTest + +# import CA and signed cert back into server keystore +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias CARoot -import -file ca-cert +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -import -file broker-cert-signed + +``` diff --git a/testing/environments/docker/kafka/certs/broker-cert b/testing/environments/docker/kafka/certs/broker-cert new file mode 100644 index 000000000000..3a7d9e2498ae --- /dev/null +++ b/testing/environments/docker/kafka/certs/broker-cert @@ -0,0 +1,18 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIC3zCCAccCAQAwajEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93 +bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMH +VW5rbm93bjEOMAwGA1UEAxMFa2Fma2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQCH8VYN9FMHXjnLUwT0AJDKM0u/jXE0ng1UfWPVQaVI+Eny+vmf1zDm +d/AoqXaYKzVNvyRXCy1BZGaLVA3go1U7+tVjtniuLTmveE07PuX4w9/ukZPKlUxf +KCjYCmh38BeYiJA2inaxScDO2hxHfB2pulsM+l9+q0NMXFe6RSUAKS0pAeY8KLz9 +yWg9hfq6JPuPT14HZmyxLn+1SwRbZZ+TQjlAHfZFpu/igg6cif/ez30z5Gqci+2i +VPlwl9peEsaXn5wbuP6J2Uo6dMoGiFyxFdGCWVWP9WDncvfYKJwQs09QdbFLxAst +BYSmOTszUP+h0SohaxpdC4AOcJxs+MwhAgMBAAGgMDAuBgkqhkiG9w0BCQ4xITAf +MB0GA1UdDgQWBBRFzbnwQXp+h4xE233eH3D+KfozxTANBgkqhkiG9w0BAQsFAAOC +AQEAQti4SPU8KfSoeLbLUic7UciVmwO0TZtiG+Y6fCTdRm7SYovg2zXH576ERClf +JQCzUuMH1Fi6k5adhMUxopJrVirZWOANoffe3yY/PUuFPMv5rvjmG7JqRNloNFYC +4Jah/XeITkw3BcwYxvY3lOZeXgBoRI+PwaD4JNHYf9ruc8cxY59lbWGCQOdbWYuk +ex/Y/rdmiv1cZpVAYY3VkdUnISXf4eePz4+hUdyuNGYt8Rh/dCj0D/1Xdo9jguUw +IWihuXNfH5hBzBp2hX49tCa7j8stOQW6+AS+ysUBRseFNnsu9j95PD+ue9GU5ZLR +mQzlkeZcfimH796e6XF81oCDkA== +-----END NEW CERTIFICATE REQUEST----- diff --git a/testing/environments/docker/kafka/certs/broker-cert-signed b/testing/environments/docker/kafka/certs/broker-cert-signed new file mode 100644 index 000000000000..b023f3c146d2 --- /dev/null +++ b/testing/environments/docker/kafka/certs/broker-cert-signed @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC8zCCAdsCCQC1GCJdAf28SzANBgkqhkiG9w0BAQUFADANMQswCQYDVQQGEwJV +UzAeFw0yMTEwMjEyMDM0MTBaFw0zNTA2MzAyMDM0MTBaMGoxEDAOBgNVBAYTB1Vu +a25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAOBgNV +BAoTB1Vua25vd24xEDAOBgNVBAsTB1Vua25vd24xDjAMBgNVBAMTBWthZmthMIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAh/FWDfRTB145y1ME9ACQyjNL +v41xNJ4NVH1j1UGlSPhJ8vr5n9cw5nfwKKl2mCs1Tb8kVwstQWRmi1QN4KNVO/rV +Y7Z4ri05r3hNOz7l+MPf7pGTypVMXygo2Apod/AXmIiQNop2sUnAztocR3wdqbpb +DPpffqtDTFxXukUlACktKQHmPCi8/cloPYX6uiT7j09eB2ZssS5/tUsEW2Wfk0I5 +QB32Rabv4oIOnIn/3s99M+RqnIvtolT5cJfaXhLGl5+cG7j+idlKOnTKBohcsRXR +gllVj/Vg53L32CicELNPUHWxS8QLLQWEpjk7M1D/odEqIWsaXQuADnCcbPjMIQID +AQABMA0GCSqGSIb3DQEBBQUAA4IBAQCMGbXC2YdC9+jJjUvuEJIQGwpapJ5Dejng +cnvE//+x8A4W9vC7OJUHcML2GGQIrgvYWlmsCEWX1lJtcVIbqkTqq9Sq99htdMfM +ay4fJB/ey005bhcbEP+19342HkmoOUkEg7qGWZhhL05y0m1vxKvKSUX3p+4TyW1Y +AheRbb9j41Ld3E8+COGwqIWpMNfsGjLqWjUIajemFH91Eo2FFvshM/5ly12GZEil +ivmUqSzV7o6ri0V7DZ5NPOSXEbiMQj5FfmImqXbo7JtBqM/H9S2yAPXZBfAloVNv +XvjG0dY8cnYwGL5MSRiZEuJdimptWnMzFXbD8zyRxSIUMpbDcHNf +-----END CERTIFICATE----- diff --git a/testing/environments/docker/kafka/certs/broker.keystore.jks b/testing/environments/docker/kafka/certs/broker.keystore.jks new file mode 100644 index 000000000000..aa03364e3ac6 Binary files /dev/null and b/testing/environments/docker/kafka/certs/broker.keystore.jks differ diff --git a/testing/environments/docker/kafka/certs/ca-cert b/testing/environments/docker/kafka/certs/ca-cert new file mode 100644 index 000000000000..725647f9d8ba --- /dev/null +++ b/testing/environments/docker/kafka/certs/ca-cert @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICljCCAX4CCQD+dvzut8IfyTANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJV +UzAeFw0yMTEwMjEyMDMyMDJaFw0zNTA2MzAyMDMyMDJaMA0xCzAJBgNVBAYTAlVT +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAlR6sKchCTM0qdrjAdWqR +BmSLfHHe+LB43B0T/+3Y1fporzg2eZC1dPCf7TXLzL92NOlJ+JQCsfb160gKTGfb +7+z2jm+vumbYlKVffsD7MjNdW8SDu9hfMa2DyTY742n3R/X8pc4VK0fdlTQx22Zp +aIA+XwD6hHxZQS9PHVNwTFUoPkP4jevcFANwjLUBgy3dPK0iWdVILnaAwEBg82z3 +zWRJ7I4Eg6KS+GtwZPovhiHqcJpz7QPrmggCglL8q0YZQrVrYNucRV1sjPAhEfTA +Sh7Z0UVYdx5+jJq7MyslBqzEM0OrmKrldrTHOAo9+cTc1GiKGRBhVei2R2fP2XAC +HQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQA37KiWMR6SZRmlLKV7hP9/9H1cL1FJ +OPa5MKcwh8Q38IRALCF5SlxxOByP8O01ZInkWjR3jJbMc/k4RwxQXfzYDvB4jleU +MyX63qekIsxFdUn+fzt+wA0xb7tOPGVUbM6QI++YH28p8yzSdY/bXrjRweQuVRC0 +B+0zMijI1uU6GRME9+e1OLsN5rDzCFEJUra/+UDc23BTOjC6Az00UKpOGv6oAqg8 +iuCOeVCRVPtd7mGJK1dGW3WXV3pbsu4EvfXve9qFFV/7d811JNBjnhF5lFN2JGVs +Ka9JebJ8EKWff6Ns14FJ2cOG3tx7KuWcnfTdma/mH4PeGoU1Og5Ln/ea +-----END CERTIFICATE----- diff --git a/testing/environments/docker/kafka/certs/ca-cert.srl b/testing/environments/docker/kafka/certs/ca-cert.srl new file mode 100644 index 000000000000..3336d99a9a30 --- /dev/null +++ b/testing/environments/docker/kafka/certs/ca-cert.srl @@ -0,0 +1 @@ +B518225D01FDBC4B diff --git a/testing/environments/docker/kafka/certs/ca-key b/testing/environments/docker/kafka/certs/ca-key new file mode 100644 index 000000000000..63ab92b23aa2 --- /dev/null +++ b/testing/environments/docker/kafka/certs/ca-key @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQIPiUp46K/yl8CAggA +MB0GCWCGSAFlAwQBKgQQ3vI4jI41do9rQtAyc+JwfwSCBNDDDv3uSHZ/12ACMLyu +vhrz3en62CT4ooeZKjpep4H/s+2pfqfg5bDeQUguNUo2zsy0EKeK2rIBVUG6KWIa +DWvnl0JaM/TrtGg1pMZMIFPowaf1mNcVcvN5IWEgLgoT+SY8lLtmVqIdU1d2F6nV +t+7JEpktXA5ThS0FbBpW0XI+kG9W7Ln9YPjGLbjcPayJQX1yzWJYEUfmm8lJl7cT +h5V94sB2KP9pwAP/SqElt2QK1BhStUVv3ezp6TT82PETi3No0Uh+oWaxltjPCr4z +5MN/tTDJQGc3llDIrZT+umZcgB6DBsc+nXjrlAHWPeuhIcjNJGs3V0xazQs60M6n +ldhcJH453Muwtp40VDkT21plVPwUrwQX/6gIWIHnyvK44sRG4NvmG+4NBA8V4TdK +AKhkhYTYS+sAUDsAFo408OXvpdGy7G2/cZn+r2frLHLxUU2peqRFP7YqLVs2sdez +sFyt6ZMSAh8UZDYK9kpyQMoeYj7Az14kMKIlE0JADsd3Mn8S/QJrrKWQzQhQVz6O +0rpaGnIM3cICgTK7gTlK+lDIbqAmCYnFLQsU9rHIpzVMkx2iYEId+YNbxodHpFPa +MCz6HU8qI9Tv9JIOfJKdE7tvlSnR89usOU/z+NSGqKm1dhYjG1BNI7wk8/mgMxOg +9BAujodmGvFpMPba84+QT/AtTy9YMMi4Z0H7BKHGD7HwSOTx7kP9hMz/sVVnKxfO +8C9gE91D4enrpQXu7J5JU07LCWSNLiZEegbdKvjBz5Cvfj5LPhazTLYuuU0KNIP9 +MjrgodrSp1LgESAA7z8qKUyhX2Z6uO0q1Q5OUFgGNEWXSYLplhWrvftPqdV0YAFI +4y794sojVBBnHYo+Lm5gugm4cg6bLk/YY3ScQqPYEUwO1LZSMUoB9ixLHUYY48ND +xbevM9V8vLgb6Q46zTCYPxwYfxNlcWxeQjwbVEaha5n2Sgu0dmrG/+LjrEwYtHY7 +zPdTbl28OyvXDqvilXcDQS8ZQBwqkZ00pg9fokElztgVIMp4cbtBTCiqipfNBGJg +ALEu/lFNlGjvv4iwOdx/yhVjFt2Ri3ViTEoTJ3wAh3o4wh/o8wluNb3bMgfKzw61 +/WptUvLnqKIGQ0xZtunxG9WHIpc8oTRZMMUgLnoVzJvdU9cONT5GER9WuQbwXmEE +ytIx9tVq4cb3CoJhynrL9cjGp716nBkx534gyu5N21elb8npk1XAHd6AHUViun0J +TnVHPwSSLN7naaiMKS+8KaknAdjvKCIUytLSRpRb4rkoqD/7MlYlMTAPF5IX6/Xj +fVfR8HKWtkvqhAM4lQ57zwGlpXifGM5Vi/Dq8JYcTOIHIzggbhfi+WVwWJ+SJVp3 +FQBSvyJ0XFV8piuP6J1PB6zXLioRiUMDbrl0Hmwo4spLswRsZ6D/6QuNUeNN2Lh1 +ZqtkAHWnIll1nviSEWPxiu0lA9ZwfPP1t+H0UkVi8JBUCrTh0gyr2e/CGZAd1GoP +/LnvaRntmqytavI65NlPPlvF9S7enjeEkxvtqhAIuU9nTMORnmpXX+xStfm/AtQp +2UNklwWW6bwPhMF9w+FnuJoK7mrQ5DphsZNcTly1RQ0uQkT6yrzWK5MNmLRiNOez +OmM968GQKexUL9r0BmFi7T00rQ== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/testing/environments/docker/kafka/certs/client.truststore.jks b/testing/environments/docker/kafka/certs/client.truststore.jks new file mode 100644 index 000000000000..7b18b3f645ed Binary files /dev/null and b/testing/environments/docker/kafka/certs/client.truststore.jks differ diff --git a/testing/environments/docker/kafka/run.sh b/testing/environments/docker/kafka/run.sh index 873f6951acc1..bfacf2a7242e 100755 --- a/testing/environments/docker/kafka/run.sh +++ b/testing/environments/docker/kafka/run.sh @@ -16,13 +16,31 @@ echo "Starting ZooKeeper" ${KAFKA_HOME}/bin/zookeeper-server-start.sh ${KAFKA_HOME}/config/zookeeper.properties & wait_for_port 2181 +# create a user beats with password KafkaTest, for use in client SASL authentication +/kafka/bin/kafka-configs.sh \ + --zookeeper localhost:2181 \ + --alter --add-config 'SCRAM-SHA-512=[password=KafkaTest]' \ + --entity-type users \ + --entity-name beats + echo "Starting Kafka broker" mkdir -p ${KAFKA_LOGS_DIR} ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \ - --override delete.topic.enable=true --override advertised.host.name=${KAFKA_ADVERTISED_HOST} \ - --override listeners=PLAINTEXT://0.0.0.0:9092 \ - --override logs.dir=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200 \ - --override num.partitions=3 & + --override delete.topic.enable=true \ + --override advertised.host.name=${KAFKA_ADVERTISED_HOST} \ + --override listeners=PLAINTEXT://0.0.0.0:9092,SASL_SSL://0.0.0.0:9093 \ + --override advertised.listeners=PLAINTEXT://${KAFKA_ADVERTISED_HOST}:9092,SASL_SSL://${KAFKA_ADVERTISED_HOST}:9093 \ + --override inter.broker.listener.name=PLAINTEXT \ + --override sasl.enabled.mechanisms=SCRAM-SHA-512 \ + --override listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required;" \ + --override logs.dir=${KAFKA_LOGS_DIR} \ + --override log4j.logger.kafka=DEBUG,kafkaAppender \ + --override log.flush.interval.ms=200 \ + --override num.partitions=3 \ + --override ssl.keystore.location=/broker.keystore.jks \ + --override ssl.keystore.password=KafkaTest \ + --override ssl.truststore.location=/broker.truststore.jks \ + --override ssl.truststore.password=KafkaTest & wait_for_port 9092