Skip to content

Commit

Permalink
Prevent interleaving of setting/unsetting observer states with `jetst…
Browse files Browse the repository at this point in the history
…ream_cluster_migrate` (#5503)

There is a possibility this might happen if `reConnectToRemoteLeafNode`
from a connection drop and `connectToRemoteLeafNode` from solicitation
are running in separate goroutines for the same remotes.

Signed-off-by: Neil Twigg <[email protected]>

---------

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Jun 7, 2024
1 parent 47311dd commit 684526c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
10 changes: 9 additions & 1 deletion locksordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ A reloadMu lock was added to prevent newly connecting clients racing with the co
This must be taken out as soon as a reload is about to happen before any other locks:

reloadMu -> Server
reloadMu -> optsMu
reloadMu -> optsMu

The "jscmMu" lock in the Account is used to serialise calls to checkJetStreamMigrate and
clearObserverState so that they cannot interleave which would leave Raft nodes in
inconsistent observer states.

jscmMu -> Account -> jsAccount
jscmMu -> stream.clsMu
jscmMu -> RaftNode
3 changes: 3 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type Account struct {
// and if it falls between 0 and that value, message tracing will be triggered.
traceDest string
traceDestSampling int
// Guarantee that only one goroutine can be running either checkJetStreamMigrate
// or clearObserverState at a given time for this account to prevent interleaving.
jscmMu sync.Mutex
}

const (
Expand Down
6 changes: 6 additions & 0 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,9 @@ func (s *Server) clearObserverState(remote *leafNodeCfg) {
return
}

acc.jscmMu.Lock()
defer acc.jscmMu.Unlock()

// Walk all streams looking for any clustered stream, skip otherwise.
for _, mset := range acc.streams() {
node := mset.raftNode()
Expand Down Expand Up @@ -619,6 +622,9 @@ func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
return
}

acc.jscmMu.Lock()
defer acc.jscmMu.Unlock()

// Walk all streams looking for any clustered stream, skip otherwise.
// If we are the leader force stepdown.
for _, mset := range acc.streams() {
Expand Down

0 comments on commit 684526c

Please sign in to comment.