Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

export node State #223

Merged
merged 1 commit into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (m *Memberlist) hasLeft() bool {
return atomic.LoadInt32(&m.leave) == 1
}

func (m *Memberlist) getNodeState(addr string) nodeStateType {
func (m *Memberlist) getNodeState(addr string) NodeStateType {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()

Expand Down
8 changes: 4 additions & 4 deletions memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,9 @@ func TestMemberList_Members(t *testing.T) {

m := &Memberlist{}
nodes := []*nodeState{
&nodeState{Node: *n1, State: stateAlive},
&nodeState{Node: *n2, State: stateDead},
&nodeState{Node: *n3, State: stateSuspect},
&nodeState{Node: *n1, State: StateAlive},
&nodeState{Node: *n2, State: StateDead},
&nodeState{Node: *n3, State: StateSuspect},
}
m.nodes = nodes

Expand Down Expand Up @@ -992,7 +992,7 @@ func TestMemberlist_Leave(t *testing.T) {
t.Fatalf("should have 1 node")
}

if m2.nodeMap[c1.Name].State != stateLeft {
if m2.nodeMap[c1.Name].State != StateLeft {
t.Fatalf("bad state")
}
}
Expand Down
23 changes: 12 additions & 11 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type pushNodeState struct {
Port uint16
Meta []byte
Incarnation uint32
State nodeStateType
State NodeStateType
Vsn []uint8 // Protocol versions
}

Expand Down Expand Up @@ -1148,16 +1148,17 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us
nodes := make([]*Node, len(remoteNodes))
for idx, n := range remoteNodes {
nodes[idx] = &Node{
Name: n.Name,
Addr: n.Addr,
Port: n.Port,
Meta: n.Meta,
PMin: n.Vsn[0],
PMax: n.Vsn[1],
PCur: n.Vsn[2],
DMin: n.Vsn[3],
DMax: n.Vsn[4],
DCur: n.Vsn[5],
Name: n.Name,
Addr: n.Addr,
Port: n.Port,
Meta: n.Meta,
State: n.State,
PMin: n.Vsn[0],
PMax: n.Vsn[1],
PCur: n.Vsn[2],
DMin: n.Vsn[3],
DMax: n.Vsn[4],
DCur: n.Vsn[5],
}
}
if err := m.config.Merge.NotifyMerge(nodes); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func TestTCPPushPull(t *testing.T) {
Port: uint16(m.config.BindPort),
},
Incarnation: 0,
State: stateSuspect,
State: StateSuspect,
StateChange: time.Now().Add(-1 * time.Second),
})

Expand All @@ -459,17 +459,17 @@ func TestTCPPushPull(t *testing.T) {
localNodes[0].Addr = net.ParseIP(m.config.BindAddr)
localNodes[0].Port = uint16(m.config.BindPort)
localNodes[0].Incarnation = 1
localNodes[0].State = stateAlive
localNodes[0].State = StateAlive
localNodes[1].Name = "Test 1"
localNodes[1].Addr = net.ParseIP(m.config.BindAddr)
localNodes[1].Port = uint16(m.config.BindPort)
localNodes[1].Incarnation = 1
localNodes[1].State = stateAlive
localNodes[1].State = StateAlive
localNodes[2].Name = "Test 2"
localNodes[2].Addr = net.ParseIP(m.config.BindAddr)
localNodes[2].Port = uint16(m.config.BindPort)
localNodes[2].Incarnation = 1
localNodes[2].State = stateAlive
localNodes[2].State = StateAlive

// Send our node state
header := pushPullHeader{Nodes: 3}
Expand Down Expand Up @@ -552,7 +552,7 @@ func TestTCPPushPull(t *testing.T) {
if n.Incarnation != 0 {
t.Fatal("bad incarnation")
}
if n.State != stateSuspect {
if n.State != StateSuspect {
t.Fatal("bad state")
}
}
Expand Down
77 changes: 39 additions & 38 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,28 @@ import (
metrics "github.com/armon/go-metrics"
)

type nodeStateType int
type NodeStateType int

const (
stateAlive nodeStateType = iota
stateSuspect
stateDead
stateLeft
StateAlive NodeStateType = iota
StateSuspect
StateDead
StateLeft
)

// Node represents a node in the cluster.
type Node struct {
Name string
Addr net.IP
Port uint16
Meta []byte // Metadata from the delegate for this node.
PMin uint8 // Minimum protocol version this understands
PMax uint8 // Maximum protocol version this understands
PCur uint8 // Current version node is speaking
DMin uint8 // Min protocol version for the delegate to understand
DMax uint8 // Max protocol version for the delegate to understand
DCur uint8 // Current version delegate is speaking
Name string
Addr net.IP
Port uint16
Meta []byte // Metadata from the delegate for this node.
State NodeStateType // State of the node.
PMin uint8 // Minimum protocol version this understands
PMax uint8 // Maximum protocol version this understands
PCur uint8 // Current version node is speaking
DMin uint8 // Min protocol version for the delegate to understand
DMax uint8 // Max protocol version for the delegate to understand
DCur uint8 // Current version delegate is speaking
}

// Address returns the host:port form of a node's address, suitable for use
Expand All @@ -60,7 +61,7 @@ func (n *Node) String() string {
type nodeState struct {
Node
Incarnation uint32 // Last known incarnation number
State nodeStateType // Current state
State NodeStateType // Current state
StateChange time.Time // Time last state change happened
}

Expand All @@ -77,7 +78,7 @@ func (n *nodeState) FullAddress() Address {
}

func (n *nodeState) DeadOrLeft() bool {
return n.State == stateDead || n.State == stateLeft
return n.State == StateDead || n.State == StateLeft
}

// ackHandler is used to register handlers for incoming acks and nacks.
Expand Down Expand Up @@ -321,7 +322,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
defer func() {
m.awareness.ApplyDelta(awarenessDelta)
}()
if node.State == stateAlive {
if node.State == StateAlive {
if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
if failedRemote(err) {
Expand Down Expand Up @@ -396,7 +397,7 @@ HANDLE_REMOTE_FAILURE:
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
n.State != StateAlive
})
m.nodeLock.RUnlock()

Expand Down Expand Up @@ -573,10 +574,10 @@ func (m *Memberlist) gossip() {
}

switch n.State {
case stateAlive, stateSuspect:
case StateAlive, StateSuspect:
return false

case stateDead:
case StateDead:
return time.Since(n.StateChange) > m.config.GossipToTheDeadTime

default:
Expand Down Expand Up @@ -623,7 +624,7 @@ func (m *Memberlist) pushPull() {
m.nodeLock.RLock()
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != stateAlive
n.State != StateAlive
})
m.nodeLock.RUnlock()

Expand Down Expand Up @@ -681,7 +682,7 @@ func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {

for _, rn := range remote {
// If the node isn't alive, then skip it
if rn.State != stateAlive {
if rn.State != StateAlive {
continue
}

Expand Down Expand Up @@ -710,7 +711,7 @@ func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {

for _, n := range m.nodes {
// Ignore non-alive nodes
if n.State != stateAlive {
if n.State != StateAlive {
continue
}

Expand Down Expand Up @@ -981,7 +982,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
Port: a.Port,
Meta: a.Meta,
},
State: stateDead,
State: StateDead,
}
if len(a.Vsn) > 5 {
state.PMin = a.Vsn[0]
Expand Down Expand Up @@ -1021,7 +1022,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
time.Since(state.StateChange) > m.config.DeadNodeReclaimTime)

// Allow the address to be updated if a dead node is being replaced.
if state.State == stateLeft || (state.State == stateDead && canReclaim) {
if state.State == StateLeft || (state.State == StateDead && canReclaim) {
m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d",
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
updatesNode = true
Expand Down Expand Up @@ -1106,8 +1107,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
state.Meta = a.Meta
state.Addr = a.Addr
state.Port = a.Port
if state.State != stateAlive {
state.State = stateAlive
if state.State != StateAlive {
state.State = StateAlive
state.StateChange = time.Now()
}
}
Expand All @@ -1117,7 +1118,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {

// Notify the delegate of any relevant updates
if m.config.Events != nil {
if oldState == stateDead || oldState == stateLeft {
if oldState == StateDead || oldState == StateLeft {
// if Dead/Left -> Alive, notify of join
m.config.Events.NotifyJoin(&state.Node)

Expand Down Expand Up @@ -1157,7 +1158,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
}

// Ignore non-alive nodes
if state.State != stateAlive {
if state.State != StateAlive {
return
}

Expand All @@ -1175,7 +1176,7 @@ func (m *Memberlist) suspectNode(s *suspect) {

// Update the state
state.Incarnation = s.Incarnation
state.State = stateSuspect
state.State = StateSuspect
changeTime := time.Now()
state.StateChange = changeTime

Expand All @@ -1199,7 +1200,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
fn := func(numConfirmations int) {
m.nodeLock.Lock()
state, ok := m.nodeMap[s.Node]
timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
timeout := ok && state.State == StateSuspect && state.StateChange == changeTime
m.nodeLock.Unlock()

if timeout {
Expand Down Expand Up @@ -1265,9 +1266,9 @@ func (m *Memberlist) deadNode(d *dead) {
// If the dead message was send by the node itself, mark it is left
// instead of dead.
if d.Node == d.From {
state.State = stateLeft
state.State = StateLeft
} else {
state.State = stateDead
state.State = StateDead
}
state.StateChange = time.Now()

Expand All @@ -1282,7 +1283,7 @@ func (m *Memberlist) deadNode(d *dead) {
func (m *Memberlist) mergeState(remote []pushNodeState) {
for _, r := range remote {
switch r.State {
case stateAlive:
case StateAlive:
a := alive{
Incarnation: r.Incarnation,
Node: r.Name,
Expand All @@ -1293,14 +1294,14 @@ func (m *Memberlist) mergeState(remote []pushNodeState) {
}
m.aliveNode(&a, nil, false)

case stateLeft:
case StateLeft:
d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name}
m.deadNode(&d)
case stateDead:
case StateDead:
// If the remote node believes a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
fallthrough
case stateSuspect:
case StateSuspect:
s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
m.suspectNode(&s)
}
Expand Down
Loading