diff --git a/.kitchen.yml b/.kitchen.yml index 21409168..ed1668b0 100644 --- a/.kitchen.yml +++ b/.kitchen.yml @@ -119,3 +119,40 @@ suites: instances: - host: localhost port: 27017 + +- name: dd-agent-kafka + run_list: + - recipe[datadog::dd-handler] + - recipe[datadog::kafka] + attributes: + datadog: + api_key: somethingnotnil + application_key: alsonotnil + kafka: + instances: + - host: localhost + port: 9999 + name: my_kafka + user: username + password: password + java_bin_path: /path/to/java + trust_store_path: /path/to/trustStore.jks + trust_store_password: password + +- name: dd-agent-kafka-consumer + run_list: + - recipe[datadog::dd-handler] + - recipe[datadog::kafka_consumer] + attributes: + datadog: + api_key: somethingnotnil + application_key: alsonotnil + kafka_consumer: + instances: + - kafka_connect_str: localhost:19092 + zk_connect_str: localhost:2181 + zk_prefix: /0.8 + consumer_groups: + my_consumer: + my_topic: [0, 1, 4, 12] + diff --git a/recipes/kafka.rb b/recipes/kafka.rb new file mode 100644 index 00000000..ce3f5296 --- /dev/null +++ b/recipes/kafka.rb @@ -0,0 +1,25 @@ +include_recipe "datadog::dd-agent" + +# Monitor Kafka +# +# Assuming you have 2 clusters "test" and "prod", +# one with and one without authentication +# you need to set up the following attributes +# node.datadog.kafka.instances = [ +# { +# :host => "localhost", +# :port => "9999", +# :name => "prod", +# :username => "username", +# :password => "secret" +# }, +# { +# :host => "localhost", +# :port => "8199", +# :name => "test" +# } +# ] + +datadog_monitor "kafka" do + instances node['datadog']['kafka']['instances'] +end diff --git a/recipes/kafka_consumer.rb b/recipes/kafka_consumer.rb new file mode 100644 index 00000000..1d013bdb --- /dev/null +++ b/recipes/kafka_consumer.rb @@ -0,0 +1,16 @@ +include_recipe "datadog::dd-agent" + +# Monitor Kafka +# +# You need to set up the following attributes +# node.datadog.kafka_consumer.instances = [ +# { +# :kafka_connect_str => "localhost:19092", +# :zk_connect_str => "localhost:2181", +# :zk_prefix => "/0.8" +# } +# ] + +datadog_monitor "kafka_consumer" do + instances node['datadog']['kafka_consumer']['instances'] +end diff --git a/templates/default/kafka.yaml.erb b/templates/default/kafka.yaml.erb new file mode 100644 index 00000000..f096e04c --- /dev/null +++ b/templates/default/kafka.yaml.erb @@ -0,0 +1,164 @@ +instances: + <% @instances.each do |i| -%> + - host: <%= i['host'] %> + port: <%= i['port'] %> + <% if i['name'] -%> + name: <%= i['name'] %> + <% end -%> + <% if i['user'] -%> + user: <%= i['user'] %> + <% end -%> + <% if i['password'] -%> + password: <%= i['password'] %> + <% end -%> + <% if i['java_bin_path'] -%> + java_bin_path: <%= i['java_bin_path'] %> #Optional, should be set if the agent cannot find your java executable + <% end -%> + <% if i['trust_store_path'] -%> + trust_store_path: <%= i['trust_store_path'] %> # Optional, should be set if ssl is enabled + <% end -%> + <% if i['trust_store_password'] -%> + trust_store_password: <%= i['trust_store_password'] %> + <% end -%> + <% end -%> + +init_config: + is_jmx: true + + # Metrics collected by this check. You should not have to modify this. + conf: + # + # Aggregate cluster stats + # + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesOutPerSec"' + attribute: + MeanRate: + metric_type: counter + alias: kafka.net.bytes_out + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"' + attribute: + MeanRate: + metric_type: counter + alias: kafka.net.bytes_in + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec"' + attribute: + MeanRate: + metric_type: gauge + alias: kafka.messages_in + + # + # Request timings + # + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedFetchRequestsPerSec"' + attribute: + MeanRate: + metric_type: gauge + alias: kafka.request.fetch.failed + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedProduceRequestsPerSec"' + attribute: + MeanRate: + metric_type: gauge + alias: kafka.request.produce.failed + - include: + domain: '"kafka.network"' + bean: '"kafka.network":type="RequestMetrics",name="Produce-TotalTimeMs"' + attribute: + Mean: + metric_type: counter + alias: kafka.request.produce.time.avg + 99thPercentile: + metric_type: counter + alias: kafka.request.produce.time.99percentile + - include: + domain: '"kafka.network"' + bean: '"kafka.network":type="RequestMetrics",name="Fetch-TotalTimeMs"' + attribute: + Mean: + metric_type: counter + alias: kafka.request.fetch.time.avg + 99thPercentile: + metric_type: counter + alias: kafka.request.fetch.time.99percentile + - include: + domain: '"kafka.network"' + bean: '"kafka.network":type="RequestMetrics",name="UpdateMetadata-TotalTimeMs"' + attribute: + Mean: + metric_type: counter + alias: kafka.request.update_metadata.time.avg + 99thPercentile: + metric_type: counter + alias: kafka.request.update_metadata.time.99percentile + - include: + domain: '"kafka.network"' + bean: '"kafka.network":type="RequestMetrics",name="Metadata-TotalTimeMs"' + attribute: + Mean: + metric_type: counter + alias: kafka.request.metadata.time.avg + 99thPercentile: + metric_type: counter + alias: kafka.request.metadata.time.99percentile + - include: + domain: '"kafka.network"' + bean: '"kafka.network":type="RequestMetrics",name="Offsets-TotalTimeMs"' + attribute: + Mean: + metric_type: counter + alias: kafka.request.offsets.time.avg + 99thPercentile: + metric_type: counter + alias: kafka.request.offsets.time.99percentile + + # + # Replication stats + # + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="ReplicaManager",name="ISRShrinksPerSec"' + attribute: + MeanRate: + metric_type: counter + alias: kafka.replication.isr_shrinks + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="ReplicaManager",name="ISRExpandsPerSec"' + attribute: + MeanRate: + metric_type: counter + alias: kafka.replication.isr_expands + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="ControllerStats",name="LeaderElectionRateAndTimeMs"' + attribute: + MeanRate: + metric_type: counter + alias: kafka.replication.leader_elections + - include: + domain: '"kafka.server"' + bean: '"kafka.server":type="ControllerStats",name="UncleanLeaderElectionsPerSec"' + attribute: + MeanRate: + metric_type: counter + alias: kafka.replication.unclean_leader_elections + + # + # Log flush stats + # + - include: + domain: '"kafka.log"' + bean: '"kafka.log":type="LogFlushStats",name="LogFlushRateAndTimeMs"' + attribute: + MeanRate: + metric_type: counter + alias: kafka.log.flush_rate diff --git a/templates/default/kafka_consumer.yaml.erb b/templates/default/kafka_consumer.yaml.erb new file mode 100644 index 00000000..8489eead --- /dev/null +++ b/templates/default/kafka_consumer.yaml.erb @@ -0,0 +1,21 @@ +instances: + <% @instances.each do |i| -%> + - kafka_connect_str: <%= i['kafka_connect_str'] %> + zk_connect_str: <%= i['zk_connect_str'] %> + <% if i['zk_prefix'] -%> + zk_prefix: <%= i['zk_prefix'] %> + <% end -%> + <% if i['consumer_groups'] -%> + consumer_groups: <%= i['name'] %> + <% i["consumer_groups"].each do |consumer, t| -%> + <%= consumer %>: + <% t.each do |topic, l| -%> + <%= topic %>: <%= l %> + <% end -%> + <% end -%> + <% end -%> + <% end -%> + +init_config: +# The Kafka Consumer check does not require any init_config + diff --git a/test/integration/dd-agent-kafka-consumer/bats/kafka_consumer_config.bats b/test/integration/dd-agent-kafka-consumer/bats/kafka_consumer_config.bats new file mode 100644 index 00000000..c651f22c --- /dev/null +++ b/test/integration/dd-agent-kafka-consumer/bats/kafka_consumer_config.bats @@ -0,0 +1,19 @@ +@test "kafka.yaml exists" { + [ -f /etc/dd-agent/conf.d/kafka_consumer.yaml ] +} + +@test "kafka.yaml is correct" { + export PYTHONPATH=/usr/share/datadog/agent/ + script='import yaml, json, sys; print json.dumps(yaml.load(sys.stdin.read()))' + actual=$(cat /etc/dd-agent/conf.d/kafka_consumer.yaml | python -c "$script") + + expected='{"instances": [{"zk_connect_str": "localhost:2181", "kafka_connect_str": "localhost:19092", "consumer_groups": {"my_consumer": {"my_topic": [0, 1, 4, 12]}}, "zk_prefix": "/0.8"}], "init_config": null}' + + echo "Expected: $expected" + echo "Actual: $actual" + [ "x$actual" == "x$expected" ] +} + + + + diff --git a/test/integration/dd-agent-kafka/bats/kafka_config.bats b/test/integration/dd-agent-kafka/bats/kafka_config.bats new file mode 100644 index 00000000..9a9988b4 --- /dev/null +++ b/test/integration/dd-agent-kafka/bats/kafka_config.bats @@ -0,0 +1,19 @@ +@test "kafka.yaml exists" { + [ -f /etc/dd-agent/conf.d/kafka.yaml ] +} + +@test "kafka.yaml is correct" { + export PYTHONPATH=/usr/share/datadog/agent/ + script='import yaml, json, sys; print json.dumps(yaml.load(sys.stdin.read()))' + actual=$(cat /etc/dd-agent/conf.d/kafka.yaml | python -c "$script") + + expected='{"instances": [{"name": "my_kafka", "trust_store_password": "password", "host": "localhost", "trust_store_path": "/path/to/trustStore.jks", "user": "username", "java_bin_path": "/path/to/java", "password": "password", "port": 9999}], "init_config": {"is_jmx": true, "conf": [{"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.net.bytes_out"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesOutPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.net.bytes_in"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesInPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.messages_in"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsMessagesInPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.request.fetch.failed"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsFailedFetchRequestsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.request.produce.failed"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsFailedProduceRequestsPerSec\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.produce.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.produce.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Produce-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.fetch.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.fetch.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Fetch-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.update_metadata.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.update_metadata.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"UpdateMetadata-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.metadata.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.metadata.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Metadata-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.offsets.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.offsets.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Offsets-TotalTimeMs\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.isr_shrinks"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ReplicaManager\",name=\"ISRShrinksPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.isr_expands"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ReplicaManager\",name=\"ISRExpandsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.leader_elections"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ControllerStats\",name=\"LeaderElectionRateAndTimeMs\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.unclean_leader_elections"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ControllerStats\",name=\"UncleanLeaderElectionsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.log.flush_rate"}}, "domain": "\"kafka.log\"", "include": null, "bean": "\"kafka.log\":type=\"LogFlushStats\",name=\"LogFlushRateAndTimeMs\""}]}}' + + echo "Expected: $expected" + echo "Actual: $actual" + [ "x$actual" == "x$expected" ] +} + + + +