From 1603e6487f570b687d60358a0e0ffc2b9a65678d Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Wed, 17 Oct 2018 16:59:12 +0200 Subject: [PATCH] Fix kafka broker matching when port is not set (#8613) Fix kafka broker matching when port is not set and add tests for broker matcher function. --- CHANGELOG.asciidoc | 1 + metricbeat/module/kafka/broker.go | 74 ++++++++---- metricbeat/module/kafka/broker_test.go | 159 +++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 22 deletions(-) create mode 100644 metricbeat/module/kafka/broker_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 670f01d5029a..55ae094be718 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Fix dropwizard module parsing of metric names. {issue}8365[8365] {pull}6385[8385] - Added io disk read and write times to system module {issue}8473[8473] {pull}8508[8508] - Avoid mapping issues in kubernetes module. {pull}8487[8487] +- Fix issue that would prevent kafka module to find a proper broker when port is not set {pull}8613[8613] *Packetbeat* diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 1efff5cd1f2c..da3dfacb3c83 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -122,7 +122,8 @@ func (b *Broker) Connect() error { return err } - other := findMatchingBroker(brokerAddress(b.broker), meta.Brokers) + finder := brokerFinder{Net: &defaultNet{}} + other := finder.findBroker(brokerAddress(b.broker), meta.Brokers) if other == nil { // no broker found closeBroker(b.broker) return fmt.Errorf("No advertised broker with address %v found", b.Addr()) @@ -352,36 +353,65 @@ func checkRetryQuery(err error) (retry, reconnect bool) { return false, false } -func findMatchingBroker( - addr string, - brokers []*sarama.Broker, -) *sarama.Broker { +// NetInfo can be used to obtain network information +type NetInfo interface { + LookupIP(string) ([]net.IP, error) + LookupAddr(string) ([]string, error) + LocalIPAddrs() ([]net.IP, error) + Hostname() (string, error) +} + +type defaultNet struct{} + +// LookupIP looks up a host using the local resolver +func (m *defaultNet) LookupIP(addr string) ([]net.IP, error) { + return net.LookupIP(addr) +} + +// LookupAddr returns the list of hosts resolving to an specific address +func (m *defaultNet) LookupAddr(address string) ([]string, error) { + return net.LookupAddr(address) +} + +// LocalIPAddrs return the list of IP addresses configured in local network interfaces +func (m *defaultNet) LocalIPAddrs() ([]net.IP, error) { + return common.LocalIPAddrs() +} + +// Hostname returns the hostname reported by the OS +func (m *defaultNet) Hostname() (string, error) { + return os.Hostname() +} + +type brokerFinder struct { + Net NetInfo +} + +func (m *brokerFinder) findBroker(addr string, brokers []*sarama.Broker) *sarama.Broker { lst := brokerAddresses(brokers) - if idx, found := findMatchingAddress(addr, lst); found { + if idx, found := m.findAddress(addr, lst); found { return brokers[idx] } return nil } -func findMatchingAddress( - addr string, - brokers []string, -) (int, bool) { +func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) { debugf("Try to match broker to: %v", addr) - // compare connection address to list of broker addresses - if i, found := indexOf(addr, brokers); found { - return i, true - } - // get connection 'port' host, port, err := net.SplitHostPort(addr) if err != nil || port == "" { + host = addr port = "9092" } + // compare connection address to list of broker addresses + if i, found := indexOf(net.JoinHostPort(host, port), brokers); found { + return i, true + } + // lookup local machines ips for comparing with broker addresses - localIPs, err := common.LocalIPAddrs() + localIPs, err := m.Net.LocalIPAddrs() if err != nil || len(localIPs) == 0 { return -1, false } @@ -389,7 +419,7 @@ func findMatchingAddress( // try to find broker by comparing the fqdn for each known ip to list of // brokers - localHosts := lookupHosts(localIPs) + localHosts := m.lookupHosts(localIPs) debugf("local machine addresses: %v", localHosts) for _, host := range localHosts { debugf("try to match with fqdn: %v (%v)", host, port) @@ -401,7 +431,7 @@ func findMatchingAddress( // try matching ip of configured host with broker list, this would // match if hosts of advertised addresses are IPs, but configured host // is a hostname - ips, err := net.LookupIP(host) + ips, err := m.Net.LookupIP(host) if err == nil { for _, ip := range ips { addr := net.JoinHostPort(ip.String(), port) @@ -413,7 +443,7 @@ func findMatchingAddress( // try to find broker id by comparing the machines local hostname to // broker hostnames in metadata - if host, err := os.Hostname(); err == nil { + if host, err := m.Net.Hostname(); err == nil { debugf("try to match with hostname only: %v (%v)", host, port) tmp := net.JoinHostPort(strings.ToLower(host), port) @@ -437,7 +467,7 @@ func findMatchingAddress( } // lookup all ips for brokers host: - ips, err := net.LookupIP(bh) + ips, err := m.Net.LookupIP(bh) debugf("broker %v ips: %v, %v", bh, ips, err) if err != nil { continue @@ -454,7 +484,7 @@ func findMatchingAddress( return -1, false } -func lookupHosts(ips []net.IP) []string { +func (m *brokerFinder) lookupHosts(ips []net.IP) []string { set := map[string]struct{}{} for _, ip := range ips { txt, err := ip.MarshalText() @@ -462,7 +492,7 @@ func lookupHosts(ips []net.IP) []string { continue } - hosts, err := net.LookupAddr(string(txt)) + hosts, err := m.Net.LookupAddr(string(txt)) debugf("lookup %v => %v, %v", string(txt), hosts, err) if err != nil { continue diff --git a/metricbeat/module/kafka/broker_test.go b/metricbeat/module/kafka/broker_test.go new file mode 100644 index 000000000000..b9d7262683b9 --- /dev/null +++ b/metricbeat/module/kafka/broker_test.go @@ -0,0 +1,159 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kafka + +import ( + "net" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +type dummyNet struct{} + +func (m *dummyNet) LookupIP(addr string) ([]net.IP, error) { + dns := map[string][]net.IP{ + "kafka1": []net.IP{net.IPv4(10, 0, 0, 1)}, + "kafka2": []net.IP{net.IPv4(10, 0, 0, 2)}, + "kafka3": []net.IP{net.IPv4(10, 0, 0, 3)}, + } + ips, found := dns[addr] + if !found { + return nil, errors.New("not found") + } + return ips, nil +} + +func (m *dummyNet) LookupAddr(addr string) ([]string, error) { + dns := map[string][]string{ + "10.0.0.1": []string{"kafka1"}, + "10.0.0.2": []string{"kafka2"}, + "10.0.0.3": []string{"kafka3"}, + } + names, found := dns[addr] + if !found { + return nil, errors.New("not found") + } + return names, nil +} + +func (m *dummyNet) LocalIPAddrs() ([]net.IP, error) { + return []net.IP{ + net.IPv4(127, 0, 0, 1), + net.IPv4(10, 0, 0, 2), + net.IPv4(10, 1, 0, 2), + }, nil +} + +func (m *dummyNet) Hostname() (string, error) { + return "kafka2", nil +} + +func TestFindMatchingAddress(t *testing.T) { + cases := []struct { + title string + address string + brokers []string + index int + exists bool + }{ + { + title: "exists", + address: "10.0.0.2:9092", + brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"}, + index: 1, + exists: true, + }, + { + title: "doesn't exist", + address: "8.8.8.8:9092", + brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"}, + exists: false, + }, + { + title: "exists on default port", + address: "10.0.0.2", + brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"}, + index: 1, + exists: true, + }, + { + title: "multiple brokers on same host", + address: "127.0.0.1:9093", + brokers: []string{"127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"}, + index: 1, + exists: true, + }, + { + title: "hostname", + address: "kafka2:9092", + brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"}, + index: 1, + exists: true, + }, + { + title: "hostname and default port", + address: "kafka2", + brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"}, + index: 1, + exists: true, + }, + { + title: "hostname and default port doesn't exist", + address: "kafka2", + brokers: []string{"kafka1:9092", "kafka2:9093", "kafka3:9092"}, + exists: false, + }, + { + title: "hostname with ip brokers", + address: "kafka2:9092", + brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"}, + index: 1, + exists: true, + }, + { + title: "ip with named brokers", + address: "10.0.0.2:9092", + brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"}, + index: 1, + exists: true, + }, + { + title: "ip with multiple local brokers without name", + address: "10.1.0.2:9094", + brokers: []string{"10.1.0.2:9092", "10.1.0.2:9093", "10.1.0.2:9094"}, + index: 2, + exists: true, + }, + } + + finder := brokerFinder{Net: &dummyNet{}} + for _, c := range cases { + t.Run(c.title, func(t *testing.T) { + i, found := finder.findAddress(c.address, c.brokers) + if c.exists { + if assert.True(t, found, "broker expected to be found") { + assert.Equal(t, c.index, i, "incorrect broker match") + } + } else { + assert.False(t, found, "broker shouldn't be found") + } + }) + } +}