Skip to content

Commit

Permalink
Merge pull request #2801 from hashicorp/spoken-hub-oss
Browse files Browse the repository at this point in the history
Adds support for WAN soft fail and join flooding.
  • Loading branch information
slackpad authored Mar 20, 2017
2 parents aabd802 + e91377f commit 3b3cb0d
Show file tree
Hide file tree
Showing 45 changed files with 2,428 additions and 991 deletions.
5 changes: 3 additions & 2 deletions api/coordinate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ type CoordinateEntry struct {
Coord *coordinate.Coordinate
}

// CoordinateDatacenterMap represents a datacenter and its associated WAN
// nodes and their associates coordinates.
// CoordinateDatacenterMap has the coordinates for servers in a given datacenter
// and area. Network coordinates are only compatible within the same area.
type CoordinateDatacenterMap struct {
Datacenter string
AreaID string
Coordinates []CoordinateEntry
}

Expand Down
158 changes: 158 additions & 0 deletions api/operator_area.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// The /v1/operator/area endpoints are available only in Consul Enterprise and
// interact with its network area subsystem. Network areas are used to link
// together Consul servers in different Consul datacenters. With network areas,
// Consul datacenters can be linked together in ways other than a fully-connected
// mesh, as is required for Consul's WAN.
package api

import (
"net"
"time"
)

// Area defines a network area.
type Area struct {
// ID is this identifier for an area (a UUID). This must be left empty
// when creating a new area.
ID string

// PeerDatacenter is the peer Consul datacenter that will make up the
// other side of this network area. Network areas always involve a pair
// of datacenters: the datacenter where the area was created, and the
// peer datacenter. This is required.
PeerDatacenter string

// RetryJoin specifies the address of Consul servers to join to, such as
// an IPs or hostnames with an optional port number. This is optional.
RetryJoin []string
}

// AreaJoinResponse is returned when a join occurs and gives the result for each
// address.
type AreaJoinResponse struct {
// The address that was joined.
Address string

// Whether or not the join was a success.
Joined bool

// If we couldn't join, this is the message with information.
Error string
}

// SerfMember is a generic structure for reporting information about members in
// a Serf cluster. This is only used by the area endpoints right now, but this
// could be expanded to other endpoints in the future.
type SerfMember struct {
// ID is the node identifier (a UUID).
ID string

// Name is the node name.
Name string

// Addr has the IP address.
Addr net.IP

// Port is the RPC port.
Port uint16

// Datacenter is the DC name.
Datacenter string

// Role is "client", "server", or "unknown".
Role string

// Build has the version of the Consul agent.
Build string

// Protocol is the protocol of the Consul agent.
Protocol int

// Status is the Serf health status "none", "alive", "leaving", "left",
// or "failed".
Status string

// RTT is the estimated round trip time from the server handling the
// request to the this member. This will be negative if no RTT estimate
// is available.
RTT time.Duration
}

// AreaCreate will create a new network area. The ID in the given structure must
// be empty and a generated ID will be returned on success.
func (op *Operator) AreaCreate(area *Area, q *WriteOptions) (string, *WriteMeta, error) {
r := op.c.newRequest("POST", "/v1/operator/area")
r.setWriteOptions(q)
r.obj = area
rtt, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()

wm := &WriteMeta{}
wm.RequestTime = rtt

var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}

// AreaList returns all the available network areas.
func (op *Operator) AreaList(q *QueryOptions) ([]*Area, *QueryMeta, error) {
var out []*Area
qm, err := op.c.query("/v1/operator/area", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

// AreaDelete deletes the given network area.
func (op *Operator) AreaDelete(areaID string, q *WriteOptions) (*WriteMeta, error) {
r := op.c.newRequest("DELETE", "/v1/operator/area/"+areaID)
r.setWriteOptions(q)
rtt, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

wm := &WriteMeta{}
wm.RequestTime = rtt
return wm, nil
}

// AreaJoin attempts to join the given set of join addresses to the given
// network area. See the Area structure for details about join addresses.
func (op *Operator) AreaJoin(areaID string, addresses []string, q *WriteOptions) ([]*AreaJoinResponse, *WriteMeta, error) {
r := op.c.newRequest("PUT", "/v1/operator/area/"+areaID+"/join")
r.setWriteOptions(q)
r.obj = addresses
rtt, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()

wm := &WriteMeta{}
wm.RequestTime = rtt

var out []*AreaJoinResponse
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, wm, nil
}

// AreaMembers lists the Serf information about the members in the given area.
func (op *Operator) AreaMembers(areaID string, q *QueryOptions) ([]*SerfMember, *QueryMeta, error) {
var out []*SerfMember
qm, err := op.c.query("/v1/operator/area/"+areaID+"/members", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}
10 changes: 0 additions & 10 deletions command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,6 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta)
}

srv.agent.config.DisableCoordinates = true
obj, err = srv.AgentSelf(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
val = obj.(AgentSelf)
if val.Coord != nil {
t.Fatalf("should have been nil: %v", val.Coord)
}

// Make sure there's nothing called "token" that's leaked.
raw, err := srv.marshalJSON(req, obj)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func nextConfig() *Config {
cons.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
cons.RaftConfig.ElectionTimeout = 40 * time.Millisecond

cons.DisableCoordinates = false
cons.CoordinateUpdatePeriod = 100 * time.Millisecond
return conf
}
Expand Down
14 changes: 11 additions & 3 deletions command/rtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Usage: consul rtt [options] node1 [node2]
the datacenter (eg. "myserver.dc1").
It is not possible to measure between LAN coordinates and WAN coordinates
because they are maintained by independent Serf gossip pools, so they are
because they are maintained by independent Serf gossip areas, so they are
not compatible.
` + c.Command.Help()
Expand Down Expand Up @@ -102,21 +102,29 @@ func (c *RTTCommand) Run(args []string) int {
return 1
}

// See if the requested nodes are in there.
// See if the requested nodes are in there. We only compare
// coordinates in the same areas.
var area1, area2 string
for _, dc := range dcs {
for _, entry := range dc.Coordinates {
if dc.Datacenter == dc1 && entry.Node == node1 {
area1 = dc.AreaID
coord1 = entry.Coord
}
if dc.Datacenter == dc2 && entry.Node == node2 {
area2 = dc.AreaID
coord2 = entry.Coord
}

if coord1 != nil && coord2 != nil {
if area1 == area2 && coord1 != nil && coord2 != nil {
goto SHOW_RTT
}
}
}

// Nil out the coordinates so we don't display across areas if
// we didn't find anything.
coord1, coord2 = nil, nil
} else {
source = "LAN"

Expand Down
22 changes: 18 additions & 4 deletions consul/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Server struct {
ID string
Datacenter string
Port int
WanJoinPort int
Bootstrap bool
Expect int
Version int
Expand Down Expand Up @@ -81,16 +82,28 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil
}

wan_join_port := 0
wan_join_port_str, ok := m.Tags["wan_join_port"]
if ok {
wan_join_port, err = strconv.Atoi(wan_join_port_str)
if err != nil {
return false, nil
}
}

vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
return false, nil
}

raft_vsn_str := m.Tags["raft_vsn"]
raft_vsn, err := strconv.Atoi(raft_vsn_str)
if err != nil {
return false, nil
raft_vsn := 0
raft_vsn_str, ok := m.Tags["raft_vsn"]
if ok {
raft_vsn, err = strconv.Atoi(raft_vsn_str)
if err != nil {
return false, nil
}
}

addr := &net.TCPAddr{IP: m.Addr, Port: port}
Expand All @@ -100,6 +113,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
ID: m.Tags["id"],
Datacenter: datacenter,
Port: port,
WanJoinPort: wan_join_port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Expand Down
Loading

0 comments on commit 3b3cb0d

Please sign in to comment.