Skip to content

Commit

Permalink
Fix kafka broker matching when port is not set (#8613)
Browse files Browse the repository at this point in the history
Fix kafka broker matching when port is not set and add tests
for broker matcher function.
  • Loading branch information
jsoriano authored Oct 17, 2018
1 parent 9812157 commit 1603e64
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fix dropwizard module parsing of metric names. {issue}8365[8365] {pull}6385[8385]
- Added io disk read and write times to system module {issue}8473[8473] {pull}8508[8508]
- Avoid mapping issues in kubernetes module. {pull}8487[8487]
- Fix issue that would prevent kafka module to find a proper broker when port is not set {pull}8613[8613]

*Packetbeat*

Expand Down
74 changes: 52 additions & 22 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func (b *Broker) Connect() error {
return err
}

other := findMatchingBroker(brokerAddress(b.broker), meta.Brokers)
finder := brokerFinder{Net: &defaultNet{}}
other := finder.findBroker(brokerAddress(b.broker), meta.Brokers)
if other == nil { // no broker found
closeBroker(b.broker)
return fmt.Errorf("No advertised broker with address %v found", b.Addr())
Expand Down Expand Up @@ -352,44 +353,73 @@ func checkRetryQuery(err error) (retry, reconnect bool) {
return false, false
}

func findMatchingBroker(
addr string,
brokers []*sarama.Broker,
) *sarama.Broker {
// NetInfo can be used to obtain network information
type NetInfo interface {
LookupIP(string) ([]net.IP, error)
LookupAddr(string) ([]string, error)
LocalIPAddrs() ([]net.IP, error)
Hostname() (string, error)
}

type defaultNet struct{}

// LookupIP looks up a host using the local resolver
func (m *defaultNet) LookupIP(addr string) ([]net.IP, error) {
return net.LookupIP(addr)
}

// LookupAddr returns the list of hosts resolving to an specific address
func (m *defaultNet) LookupAddr(address string) ([]string, error) {
return net.LookupAddr(address)
}

// LocalIPAddrs return the list of IP addresses configured in local network interfaces
func (m *defaultNet) LocalIPAddrs() ([]net.IP, error) {
return common.LocalIPAddrs()
}

// Hostname returns the hostname reported by the OS
func (m *defaultNet) Hostname() (string, error) {
return os.Hostname()
}

type brokerFinder struct {
Net NetInfo
}

func (m *brokerFinder) findBroker(addr string, brokers []*sarama.Broker) *sarama.Broker {
lst := brokerAddresses(brokers)
if idx, found := findMatchingAddress(addr, lst); found {
if idx, found := m.findAddress(addr, lst); found {
return brokers[idx]
}
return nil
}

func findMatchingAddress(
addr string,
brokers []string,
) (int, bool) {
func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
debugf("Try to match broker to: %v", addr)

// compare connection address to list of broker addresses
if i, found := indexOf(addr, brokers); found {
return i, true
}

// get connection 'port'
host, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
host = addr
port = "9092"
}

// compare connection address to list of broker addresses
if i, found := indexOf(net.JoinHostPort(host, port), brokers); found {
return i, true
}

// lookup local machines ips for comparing with broker addresses
localIPs, err := common.LocalIPAddrs()
localIPs, err := m.Net.LocalIPAddrs()
if err != nil || len(localIPs) == 0 {
return -1, false
}
debugf("local machine ips: %v", localIPs)

// try to find broker by comparing the fqdn for each known ip to list of
// brokers
localHosts := lookupHosts(localIPs)
localHosts := m.lookupHosts(localIPs)
debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)
Expand All @@ -401,7 +431,7 @@ func findMatchingAddress(
// try matching ip of configured host with broker list, this would
// match if hosts of advertised addresses are IPs, but configured host
// is a hostname
ips, err := net.LookupIP(host)
ips, err := m.Net.LookupIP(host)
if err == nil {
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), port)
Expand All @@ -413,7 +443,7 @@ func findMatchingAddress(

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
if host, err := m.Net.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)

tmp := net.JoinHostPort(strings.ToLower(host), port)
Expand All @@ -437,7 +467,7 @@ func findMatchingAddress(
}

// lookup all ips for brokers host:
ips, err := net.LookupIP(bh)
ips, err := m.Net.LookupIP(bh)
debugf("broker %v ips: %v, %v", bh, ips, err)
if err != nil {
continue
Expand All @@ -454,15 +484,15 @@ func findMatchingAddress(
return -1, false
}

func lookupHosts(ips []net.IP) []string {
func (m *brokerFinder) lookupHosts(ips []net.IP) []string {
set := map[string]struct{}{}
for _, ip := range ips {
txt, err := ip.MarshalText()
if err != nil {
continue
}

hosts, err := net.LookupAddr(string(txt))
hosts, err := m.Net.LookupAddr(string(txt))
debugf("lookup %v => %v, %v", string(txt), hosts, err)
if err != nil {
continue
Expand Down
159 changes: 159 additions & 0 deletions metricbeat/module/kafka/broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka

import (
"net"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

type dummyNet struct{}

func (m *dummyNet) LookupIP(addr string) ([]net.IP, error) {
dns := map[string][]net.IP{
"kafka1": []net.IP{net.IPv4(10, 0, 0, 1)},
"kafka2": []net.IP{net.IPv4(10, 0, 0, 2)},
"kafka3": []net.IP{net.IPv4(10, 0, 0, 3)},
}
ips, found := dns[addr]
if !found {
return nil, errors.New("not found")
}
return ips, nil
}

func (m *dummyNet) LookupAddr(addr string) ([]string, error) {
dns := map[string][]string{
"10.0.0.1": []string{"kafka1"},
"10.0.0.2": []string{"kafka2"},
"10.0.0.3": []string{"kafka3"},
}
names, found := dns[addr]
if !found {
return nil, errors.New("not found")
}
return names, nil
}

func (m *dummyNet) LocalIPAddrs() ([]net.IP, error) {
return []net.IP{
net.IPv4(127, 0, 0, 1),
net.IPv4(10, 0, 0, 2),
net.IPv4(10, 1, 0, 2),
}, nil
}

func (m *dummyNet) Hostname() (string, error) {
return "kafka2", nil
}

func TestFindMatchingAddress(t *testing.T) {
cases := []struct {
title string
address string
brokers []string
index int
exists bool
}{
{
title: "exists",
address: "10.0.0.2:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
index: 1,
exists: true,
},
{
title: "doesn't exist",
address: "8.8.8.8:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
exists: false,
},
{
title: "exists on default port",
address: "10.0.0.2",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
index: 1,
exists: true,
},
{
title: "multiple brokers on same host",
address: "127.0.0.1:9093",
brokers: []string{"127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"},
index: 1,
exists: true,
},
{
title: "hostname",
address: "kafka2:9092",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "hostname and default port",
address: "kafka2",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "hostname and default port doesn't exist",
address: "kafka2",
brokers: []string{"kafka1:9092", "kafka2:9093", "kafka3:9092"},
exists: false,
},
{
title: "hostname with ip brokers",
address: "kafka2:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"},
index: 1,
exists: true,
},
{
title: "ip with named brokers",
address: "10.0.0.2:9092",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "ip with multiple local brokers without name",
address: "10.1.0.2:9094",
brokers: []string{"10.1.0.2:9092", "10.1.0.2:9093", "10.1.0.2:9094"},
index: 2,
exists: true,
},
}

finder := brokerFinder{Net: &dummyNet{}}
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
i, found := finder.findAddress(c.address, c.brokers)
if c.exists {
if assert.True(t, found, "broker expected to be found") {
assert.Equal(t, c.index, i, "incorrect broker match")
}
} else {
assert.False(t, found, "broker shouldn't be found")
}
})
}
}

0 comments on commit 1603e64

Please sign in to comment.