-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
In some rare cases possible present 2 leaders in nomad servers cluster #4749
Comments
Seems that not taken into account the fact that On graph below, we can see that
The last Leader lost led to the deadlock, which stops raft from functionality correctly, and answer to heartbeats, what led to growth of Goroutines which in turn eats all ram(in fact, it is a memleak) llok at graph below |
To prevent raft leader flapping we apply this stupid patch (this is only prototype - to describe idea, not working code): diff --git a/nomad/leader.go b/nomad/leader.go
index 119e72ebf..6cbf37c65 100644
--- a/nomad/leader.go
+++ b/nomad/leader.go
@@ -46,11 +46,11 @@ var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
func (s *Server) monitorLeadership() {
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup
+ var stopMonitorLeaderFlapingWg sync.WaitGroup
for {
select {
case isLeader := <-s.leaderCh:
- switch {
- case isLeader:
+ if isLeader {
if weAreLeaderCh != nil {
s.logger.Printf("[ERR] nomad: attempted to start the leader loop while running")
continue
@@ -58,13 +58,10 @@ func (s *Server) monitorLeadership() {
weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
- go func(ch chan struct{}) {
- defer leaderLoop.Done()
- s.leaderLoop(ch)
- }(weAreLeaderCh)
+ go s.leaderLoop(weAreLeaderCh, &leaderLoop)
s.logger.Printf("[INFO] nomad: cluster leadership acquired")
- default:
+ } else {
if weAreLeaderCh == nil {
s.logger.Printf("[ERR] nomad: attempted to stop the leader loop while not running")
continue
@@ -72,7 +69,33 @@ func (s *Server) monitorLeadership() {
s.logger.Printf("[DEBUG] nomad: shutting down leader loop")
close(weAreLeaderCh)
+
+ stopMonitorLeaderFlaping := make(chan struct{})
+ stopMonitorLeaderFlapingWg.Add(1)
+
+ go func() {
+ defer stopMonitorLeaderFlapingWg.Done()
+ beforeWaitLeaderState := isLeader
+
+ for {
+ select {
+ case isLeader = <-s.leaderCh:
+ case <- stopMonitorLeaderFlaping:
+ break
+ }
+ }
+
+ if beforeWaitLeaderState != isLeader {
+ select {
+ case s.leaderCh <- isLeader:
+ default:
+ }
+ }
+ }()
+
leaderLoop.Wait()
+ close(stopMonitorLeaderFlaping)
+ stopMonitorLeaderFlapingWg.Wait()
weAreLeaderCh = nil
s.logger.Printf("[INFO] nomad: cluster leadership lost")
}
@@ -85,7 +108,8 @@ func (s *Server) monitorLeadership() {
// leaderLoop runs as long as we are the leader to run various
// maintenance activities
-func (s *Server) leaderLoop(stopCh chan struct{}) {
+func (s *Server) leaderLoop(stopCh chan struct{}, wg *sync.WaitGroup) {
+ defer wg.Done()
var reconcileCh chan serf.Member
establishedLeader := false
@@ -135,6 +159,7 @@ RECONCILE:
// updates
reconcileCh = s.reconcileCh
+WAIT:
// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
@@ -144,7 +169,6 @@ RECONCILE:
default:
}
-WAIT:
// Wait until leadership is lost
for {
select { The key idea, since @dadgar What you think about this? |
Hey there Since this issue hasn't had any activity in a while - we're going to automatically close it in 30 days. If you're still seeing this issue with the latest version of Nomad, please respond here and we'll keep this open and take another look at this. Thanks! |
This issue will be auto-closed because there hasn't been any activity for a few months. Feel free to open a new one if you still experience this problem 👍 |
Hey there Since this issue hasn't had any activity in a while - we're going to automatically close it in 30 days. If you're still seeing this issue with the latest version of Nomad, please respond here and we'll keep this open and take another look at this. Thanks! |
This issue will be auto-closed because there hasn't been any activity for a few months. Feel free to open a new one if you still experience this problem 👍 |
Re-opening this as per the discussion in #6620. |
A couple of notes, after some discussion w/ my colleagues:
It would be good if we can reduce the window for leadership transitions just from a performance perspective, but in #4749 (comment) it looks like the machines may also just be seriously underspecced -- the AWS That being said, we still need to correct the problem here with the goroutine leak, which according to the work @tantra35 did in #6620 looks to be related to RPC connections being left open. That should narrow down the area where we need to investigate. I'll pick up the reproductions I was doing in #6620 and see if I can invoke some leader election flapping. |
Hi @tantra35 . Thank you so much for your patience here and level of details. It helped a lot in identifying some of the issues. I believe there are few issues at play here and worth fixing: First, it does look like raft leader flapping can result into a deadlock and make much of nomad inoperable and may lead to goroutines growing as raft is waiting for deadlock to complete. I have some write up and details in #6977 . It's basically your fix while trying to avoid avoid blocking raft. Consul apparently had a similar issue in hashicorp/consul#6852 . This somewhat explains why we didn't see the log messages I'd expect about stepping down and why blocked evals spiked (because it though itself as the leader) but never drained (because no server would pick them up or process them). Second, there is the hung RPC connections issue of #6620 . I conjecture that raft hanging can cause some RPC to hang, specially if your patch here seems to have address the problem there and that the blocked_eval spike is correlated with hang RPC connections. I'm still researching how your cluster got in this situation, but now I have some ideas for how to reproduce it. Third, even if an RPC connection is hang due to Raft deadlock, I would have expected Yamux to time out and drop connections eventually. We are looking into this too and may have a bug there. Sadly, when chaining calls, net/rpc and hashicorp/net-rpc-msgpackrpc doesn't provide a mechanism (that I know of) to propagate cancellation, so it's expected that inflight RPC complete even if the triggering HTTP API is cancelled, but I would have expected RPC call to return fast or time out. As I work on reproduction, would love the following info:
Thank you again for info and patience - we will get to the bottom of this :). |
@notnoop Hello, first of all great work!!!
Also i must said that despite our approach, we still observed a leakage of the sockets, but this was due to a cross region API call (we had one place from which we have making API calls to multiple regions), and doesn;t correlated with leader flapping. But this was old time, we change our scheme of collection statistic - we place collector in every region, so eliminates cross region calls, then we upgraded to nomad 0.9.x(there we forgot add patch described here and again got problems) Also we patch yamux(https://github.com/hashicorp/yamux) to add timeout to diff --git "a/nomad/client_rpc.go" "b/nomad/client_rpc.go"
index ca8db2336..405858bd4 100644
--- "a/nomad/client_rpc.go"
+++ "b/nomad/client_rpc.go"
@@ -217,7 +217,7 @@ func (s *Server) serverWithNodeConn(nodeID, region string) (*serverParts, error)
// Yamux session for the node and the method to be called.
func NodeRpc(session *yamux.Session, method string, args, reply interface{}) error {
// Open a new session
- stream, err := session.Open()
+ stream, err := session.OpenTimeout(10*time.Second)
if err != nil {
return err
}
@@ -244,7 +244,7 @@ func NodeRpc(session *yamux.Session, method string, args, reply interface{}) err
// the callers responsibility to close the connection if there is no error.
func NodeStreamingRpc(session *yamux.Session, method string) (net.Conn, error) {
// Open a new session
- stream, err := session.Open()
+ stream, err := session.OpenTimeout(10*time.Second)
if err != nil {
return nil, err
} here is yamux patch diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go
index a80ddec35..37c2f2ea1 100644
--- a/vendor/github.com/hashicorp/yamux/session.go
+++ b/vendor/github.com/hashicorp/yamux/session.go
@@ -151,8 +151,41 @@ func (s *Session) Open() (net.Conn, error) {
return conn, nil
}
-// OpenStream is used to create a new stream
+func (s *Session) OpenTimeout(timeout time.Duration) (net.Conn, error) {
+ conn, err := s.OpenStreamTimeout(timeout)
+ if err != nil {
+ return nil, err
+ }
+ return conn, nil
+}
+
func (s *Session) OpenStream() (*Stream, error) {
+ return s._openStream(nil, func(stream *Stream) error {
+ return stream.sendWindowUpdate()
+ })
+}
+
+func (s *Session) OpenStreamTimeout(timeout time.Duration) (*Stream, error) {
+ t := timerPool.Get()
+ timer := t.(*time.Timer)
+ timer.Reset(timeout)
+ defer func() {
+ if ! timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
+ timerPool.Put(t)
+ }()
+
+ return s._openStream(timer.C, func(stream *Stream) error {
+ return stream.sendWindowUpdateTimeout(timer.C)
+ })
+}
+
+// OpenStream is used to create a new stream
+func (s *Session) _openStream(timeout <-chan time.Time, fn func(*Stream) error) (*Stream, error) {
if s.IsClosed() {
return nil, ErrSessionShutdown
}
@@ -165,6 +198,8 @@ func (s *Session) OpenStream() (*Stream, error) {
case s.synCh <- struct{}{}:
case <-s.shutdownCh:
return nil, ErrSessionShutdown
+ case <-timeout:
+ return nil, ErrTimeout
}
GET_ID:
@@ -185,7 +220,7 @@ GET_ID:
s.streamLock.Unlock()
// Send the window update to create
- if err := stream.sendWindowUpdate(); err != nil {
+ if err := fn(stream); err != nil {
select {
case <-s.synCh:
default:
@@ -340,20 +375,25 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e
timer := t.(*time.Timer)
timer.Reset(s.config.ConnectionWriteTimeout)
defer func() {
- timer.Stop()
- select {
- case <-timer.C:
- default:
+ if !timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
}
timerPool.Put(t)
}()
+ return s.waitForSendErrTimeout(timer.C, hdr, body, errCh)
+}
+
+func (s *Session) waitForSendErrTimeout(timeout <-chan time.Time, hdr header, body io.Reader, errCh chan error) error {
ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
select {
case s.sendCh <- ready:
case <-s.shutdownCh:
return ErrSessionShutdown
- case <-timer.C:
+ case <-timeout:
return ErrConnectionWriteTimeout
}
@@ -362,7 +402,7 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e
return err
case <-s.shutdownCh:
return ErrSessionShutdown
- case <-timer.C:
+ case <-timeout:
return ErrConnectionWriteTimeout
}
}
@@ -625,6 +665,7 @@ func (s *Session) incomingStream(id uint32) error {
func (s *Session) closeStream(id uint32) {
s.streamLock.Lock()
if _, ok := s.inflight[id]; ok {
+ delete(s.inflight, id)
select {
case <-s.synCh:
default:
diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go
index aa2391973..b2a523599 100644
--- a/vendor/github.com/hashicorp/yamux/stream.go
+++ b/vendor/github.com/hashicorp/yamux/stream.go
@@ -237,6 +237,16 @@ func (s *Stream) sendFlags() uint16 {
// sendWindowUpdate potentially sends a window update enabling
// further writes to take place. Must be invoked with the lock.
func (s *Stream) sendWindowUpdate() error {
+ return s._sendWindowUpdate(s.session.waitForSendErr)
+}
+
+func (s *Stream) sendWindowUpdateTimeout(timeout <-chan time.Time) error {
+ return s._sendWindowUpdate(func(hdr header, body io.Reader, errCh chan error) error {
+ return s.session.waitForSendErrTimeout(timeout, hdr, body, errCh)
+ })
+}
+
+func (s *Stream) _sendWindowUpdate(fn func(hdr header, body io.Reader, errCh chan error) error) error {
s.controlHdrLock.Lock()
defer s.controlHdrLock.Unlock()
@@ -264,7 +274,7 @@ func (s *Stream) sendWindowUpdate() error {
// Send the header
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
- if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
+ if err := fn(s.controlHdr, nil, s.controlErr); err != nil {
return err
}
return nil
diff --git a/vendor/github.com/hashicorp/yamux/util.go b/vendor/github.com/hashicorp/yamux/util.go
index 8a73e9249..ff1a31b7c 100644
--- a/vendor/github.com/hashicorp/yamux/util.go
+++ b/vendor/github.com/hashicorp/yamux/util.go
@@ -9,7 +9,12 @@ var (
timerPool = &sync.Pool{
New: func() interface{} {
timer := time.NewTimer(time.Hour * 1e6)
- timer.Stop()
+ if !timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
return timer
},
} |
@tantra35 I just wanted to follow up with you on this issue. On Feb 19 we shipped 0.10.4. My colleague @notnoop landed a fix to leadership flapping (#6977), which combined with the RPC fix we shipped in 0.10.3 we believe should fix the problems in this issue and in #6620 We continue to recommend against using t2 instance classes, but we think this should wrap up this issue for you. |
Based on the comments in #6620 (comment), I'm going to close this issue and we can follow up on any remaining issues in #4606. |
I'm going to lock this issue because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active issues. |
Nomad version
0.8.6
Issue
We saw this 2 or 3 times. And this looks like that present 2 nomad leaders at the same time
Look at screenshot of nomad telemetry
There is present overlap (2 leaders present in the cluster) of heartbeats on graphs above. When first switch of leaders happens on host where leader was present begins grows of goroutines
The switch happens between
consulnomad-02(ip: 172.29.0.72)
andip-172-29-20-8
Here they logs
The text was updated successfully, but these errors were encountered: