Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements WAN address translation. #1698

Merged
merged 19 commits into from
Feb 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
config.AdvertiseAddrWan = config.AdvertiseAddr
}

// Create the default set of tagged addresses.
config.TaggedAddresses = map[string]string{
"wan": config.AdvertiseAddrWan,
}

agent := &Agent{
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
Expand Down
6 changes: 6 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func TestAgent_CheckAdvertiseAddrsSettings(t *testing.T) {
if rpc != c.AdvertiseAddrs.RPC {
t.Fatalf("RPC is not properly set to %v: %s", c.AdvertiseAddrs.RPC, rpc)
}
expected := map[string]string{
"wan": agent.config.AdvertiseAddrWan,
}
if !reflect.DeepEqual(agent.config.TaggedAddresses, expected) {
t.Fatalf("Tagged addresses not set up properly: %v", agent.config.TaggedAddresses)
}
}

func TestAgent_AddService(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions command/agent/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestRetryJoin(t *testing.T) {
"-server",
"-data-dir", tmpDir,
"-node", fmt.Sprintf(`"%s"`, conf2.NodeName),
"-advertise", agent.config.BindAddr,
"-retry-join", serfAddr,
"-retry-interval", "1s",
"-retry-join-wan", serfWanAddr,
Expand Down
16 changes: 16 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,25 @@ type Config struct {
// Serf WAN IP. If not specified, the general advertise address is used.
AdvertiseAddrWan string `mapstructure:"advertise_addr_wan"`

// TranslateWanAddrs controls whether or not Consul should prefer
// the "wan" tagged address when doing lookups in remote datacenters.
// See TaggedAddresses below for more details.
TranslateWanAddrs bool `mapstructure:"translate_wan_addrs"`

// Port configurations
Ports PortConfig

// Address configurations
Addresses AddressConfig

// Tagged addresses. These are used to publish a set of addresses for
// for a node, which can be used by the remote agent. We currently
// populate only the "wan" tag based on the SerfWan advertise address,
// but this structure is here for possible future features with other
// user-defined tags. The "wan" tag will be used by remote agents if
// they are configured with TranslateWanAddrs set to true.
TaggedAddresses map[string]string

// LeaveOnTerm controls if Serf does a graceful leave when receiving
// the TERM signal. Defaults false. This can be changed on reload.
LeaveOnTerm bool `mapstructure:"leave_on_terminate"`
Expand Down Expand Up @@ -968,6 +981,9 @@ func MergeConfig(a, b *Config) *Config {
if b.AdvertiseAddrWan != "" {
result.AdvertiseAddrWan = b.AdvertiseAddrWan
}
if b.TranslateWanAddrs == true {
result.TranslateWanAddrs = true
}
if b.AdvertiseAddrs.SerfLan != nil {
result.AdvertiseAddrs.SerfLan = b.AdvertiseAddrs.SerfLan
result.AdvertiseAddrs.SerfLanRaw = b.AdvertiseAddrs.SerfLanRaw
Expand Down
19 changes: 19 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}

// WAN address translation disabled by default
config, err = DecodeConfig(bytes.NewReader([]byte(`{}`)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.TranslateWanAddrs != false {
t.Fatalf("bad: %#v", config)
}

// WAN address translation
input = `{"translate_wan_addrs": true}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.TranslateWanAddrs != true {
t.Fatalf("bad: %#v", config)
}

// leave_on_terminate
input = `{"leave_on_terminate": true}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
Expand Down
35 changes: 26 additions & 9 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,19 @@ INVALID:
resp.SetRcode(req, dns.RcodeNameError)
}

// translateAddr is used to provide the final, translated address for a node,
// depending on how this agent and the other node are configured.
func (d *DNSServer) translateAddr(dc string, node *structs.Node) string {
addr := node.Address
if d.agent.config.TranslateWanAddrs && (d.agent.config.Datacenter != dc) {
wanAddr := node.TaggedAddresses["wan"]
if wanAddr != "" {
addr = wanAddr
}
}
return addr
}

// nodeLookup is used to handle a node query
func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns.Msg) {
// Only handle ANY, A and AAAA type requests
Expand Down Expand Up @@ -403,7 +416,8 @@ RPC:
}

// Add the node record
records := d.formatNodeRecord(out.NodeServices.Node, out.NodeServices.Node.Address,
addr := d.translateAddr(datacenter, out.NodeServices.Node)
records := d.formatNodeRecord(out.NodeServices.Node, addr,
req.Question[0].Name, qType, d.config.NodeTTL)
if records != nil {
resp.Answer = append(resp.Answer, records...)
Expand Down Expand Up @@ -526,7 +540,7 @@ RPC:

// Add various responses depending on the request
qType := req.Question[0].Qtype
d.serviceNodeRecords(out.Nodes, req, resp, ttl)
d.serviceNodeRecords(datacenter, out.Nodes, req, resp, ttl)

if qType == dns.TypeSRV {
d.serviceSRVRecords(datacenter, out.Nodes, req, resp, ttl)
Expand Down Expand Up @@ -622,7 +636,7 @@ RPC:

// Add various responses depending on the request.
qType := req.Question[0].Qtype
d.serviceNodeRecords(out.Nodes, req, resp, ttl)
d.serviceNodeRecords(datacenter, out.Nodes, req, resp, ttl)
if qType == dns.TypeSRV {
d.serviceSRVRecords(datacenter, out.Nodes, req, resp, ttl)
}
Expand All @@ -646,18 +660,20 @@ RPC:
}

// serviceNodeRecords is used to add the node records for a service lookup
func (d *DNSServer) serviceNodeRecords(nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration) {
func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration) {
qName := req.Question[0].Name
qType := req.Question[0].Qtype
handled := make(map[string]struct{})
for _, node := range nodes {
// Avoid duplicate entries, possible if a node has
// the same service on multiple ports, etc.
addr := node.Node.Address
// Start with the translated address but use the service address,
// if specified.
addr := d.translateAddr(dc, node.Node)
if node.Service.Address != "" {
addr = node.Service.Address
}

// Avoid duplicate entries, possible if a node has
// the same service on multiple ports, etc.
if _, ok := handled[addr]; ok {
continue
}
Expand Down Expand Up @@ -698,8 +714,9 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
}
resp.Answer = append(resp.Answer, srvRec)

// Determine advertised address
addr := node.Node.Address
// Start with the translated address but use the service address,
// if specified.
addr := d.translateAddr(dc, node.Node)
if node.Service.Address != "" {
addr = node.Service.Address
}
Expand Down
191 changes: 191 additions & 0 deletions command/agent/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func TestDNS_NodeLookup(t *testing.T) {
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
TaggedAddresses: map[string]string{
"wan": "127.0.0.2",
},
}

var out struct{}
Expand Down Expand Up @@ -715,6 +718,194 @@ func TestDNS_ServiceLookup_ServiceAddress(t *testing.T) {
}
}

func TestDNS_ServiceLookup_WanAddress(t *testing.T) {
dir1, srv1 := makeDNSServerConfig(t,
func(c *Config) {
c.Datacenter = "dc1"
c.TranslateWanAddrs = true
}, nil)
defer os.RemoveAll(dir1)
defer srv1.Shutdown()

dir2, srv2 := makeDNSServerConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.TranslateWanAddrs = true
}, nil)
defer os.RemoveAll(dir2)
defer srv2.Shutdown()

testutil.WaitForLeader(t, srv1.agent.RPC, "dc1")
testutil.WaitForLeader(t, srv2.agent.RPC, "dc2")

// Join WAN cluster
addr := fmt.Sprintf("127.0.0.1:%d",
srv1.agent.config.Ports.SerfWan)
if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}

testutil.WaitForResult(
func() (bool, error) {
return len(srv1.agent.WANMembers()) > 1, nil
},
func(err error) {
t.Fatalf("Failed waiting for WAN join: %v", err)
})

// Register a remote node with a service.
{
args := &structs.RegisterRequest{
Datacenter: "dc2",
Node: "foo",
Address: "127.0.0.1",
TaggedAddresses: map[string]string{
"wan": "127.0.0.2",
},
Service: &structs.NodeService{
Service: "db",
},
}

var out struct{}
if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}

// Register an equivalent prepared query.
var id string
{
args := &structs.PreparedQueryRequest{
Datacenter: "dc2",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Service: structs.ServiceQuery{
Service: "db",
},
},
}
if err := srv2.agent.RPC("PreparedQuery.Apply", args, &id); err != nil {
t.Fatalf("err: %v", err)
}
}

// Look up the SRV record via service and prepared query.
questions := []string{
"db.service.dc2.consul.",
id + ".query.dc2.consul.",
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)

c := new(dns.Client)
addr, _ := srv1.agent.config.ClientListener("", srv1.agent.config.Ports.DNS)
in, _, err := c.Exchange(m, addr.String())
if err != nil {
t.Fatalf("err: %v", err)
}

if len(in.Answer) != 1 {
t.Fatalf("Bad: %#v", in)
}

aRec, ok := in.Extra[0].(*dns.A)
if !ok {
t.Fatalf("Bad: %#v", in.Extra[0])
}
if aRec.Hdr.Name != "foo.node.dc2.consul." {
t.Fatalf("Bad: %#v", in.Extra[0])
}
if aRec.A.String() != "127.0.0.2" {
t.Fatalf("Bad: %#v", in.Extra[0])
}
}

// Also check the A record directly
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeA)

c := new(dns.Client)
addr, _ := srv1.agent.config.ClientListener("", srv1.agent.config.Ports.DNS)
in, _, err := c.Exchange(m, addr.String())
if err != nil {
t.Fatalf("err: %v", err)
}

if len(in.Answer) != 1 {
t.Fatalf("Bad: %#v", in)
}

aRec, ok := in.Answer[0].(*dns.A)
if !ok {
t.Fatalf("Bad: %#v", in.Answer[0])
}
if aRec.Hdr.Name != question {
t.Fatalf("Bad: %#v", in.Answer[0])
}
if aRec.A.String() != "127.0.0.2" {
t.Fatalf("Bad: %#v", in.Answer[0])
}
}

// Now query from the same DC and make sure we get the local address
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)

c := new(dns.Client)
addr, _ := srv2.agent.config.ClientListener("", srv2.agent.config.Ports.DNS)
in, _, err := c.Exchange(m, addr.String())
if err != nil {
t.Fatalf("err: %v", err)
}

if len(in.Answer) != 1 {
t.Fatalf("Bad: %#v", in)
}

aRec, ok := in.Extra[0].(*dns.A)
if !ok {
t.Fatalf("Bad: %#v", in.Extra[0])
}
if aRec.Hdr.Name != "foo.node.dc2.consul." {
t.Fatalf("Bad: %#v", in.Extra[0])
}
if aRec.A.String() != "127.0.0.1" {
t.Fatalf("Bad: %#v", in.Extra[0])
}
}

// Also check the A record directly from DC2
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeA)

c := new(dns.Client)
addr, _ := srv2.agent.config.ClientListener("", srv2.agent.config.Ports.DNS)
in, _, err := c.Exchange(m, addr.String())
if err != nil {
t.Fatalf("err: %v", err)
}

if len(in.Answer) != 1 {
t.Fatalf("Bad: %#v", in)
}

aRec, ok := in.Answer[0].(*dns.A)
if !ok {
t.Fatalf("Bad: %#v", in.Answer[0])
}
if aRec.Hdr.Name != question {
t.Fatalf("Bad: %#v", in.Answer[0])
}
if aRec.A.String() != "127.0.0.1" {
t.Fatalf("Bad: %#v", in.Answer[0])
}
}
}

func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
dir, srv := makeDNSServer(t)
defer os.RemoveAll(dir)
Expand Down
Loading