Skip to content

Commit

Permalink
Improve VTOrc failure detection to be able to better handle dead prim…
Browse files Browse the repository at this point in the history
…ary failures (#13190)

* test: add a failing test

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix the problem

Signed-off-by: Manan Gupta <[email protected]>

* feat: read vttablet records for instances that have no mysql port too

Signed-off-by: Manan Gupta <[email protected]>

* feat: refactor the code

Signed-off-by: Manan Gupta <[email protected]>

* feat: add tests for the newly introduced function

Signed-off-by: Manan Gupta <[email protected]>

* test: fix test expectations

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix flakiness in tests

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix comments

Signed-off-by: Manan Gupta <[email protected]>

---------

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jun 22, 2023
1 parent f0d7289 commit e6da11d
Show file tree
Hide file tree
Showing 12 changed files with 407 additions and 81 deletions.
6 changes: 3 additions & 3 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestAPIEndpoints(t *testing.T) {

// Before we disable recoveries, let us wait until VTOrc has fixed all the issues (if any).
_, _ = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response != "[]"
return response != "null"
})

t.Run("Disable Recoveries API", func(t *testing.T) {
Expand All @@ -112,7 +112,7 @@ func TestAPIEndpoints(t *testing.T) {
// Wait until VTOrc picks up on this issue and verify
// that we see a not null result on the api/replication-analysis page
status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response == "[]"
return response == "null"
})
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"AnalyzedInstanceAlias": "%s"`, replica.Alias))
Expand All @@ -134,7 +134,7 @@ func TestAPIEndpoints(t *testing.T) {
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Equal(t, "[]", resp)
assert.Equal(t, "null", resp)

// Check that filtering using just the shard fails
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0")
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestDurabilityPolicySetLater(t *testing.T) {
time.Sleep(30 * time.Second)

// Now set the correct durability policy
out, err := newCluster.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy=semi_sync")
out, err := newCluster.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy=semi_sync")
require.NoError(t, err, out)

// VTOrc should promote a new primary after seeing the durability policy change
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtorc/primaryfailure/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMain(m *testing.M) {
var cellInfos []*utils.CellInfo
cellInfos = append(cellInfos, &utils.CellInfo{
CellName: utils.Cell1,
NumReplicas: 12,
NumReplicas: 13,
NumRdonly: 3,
UIDBase: 100,
})
Expand Down
54 changes: 54 additions & 0 deletions go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,60 @@ func TestDownPrimary(t *testing.T) {
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1)
}

// bring down primary before VTOrc has started, let vtorc repair.
func TestDownPrimaryBeforeVTOrc(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{}, 0, "none")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
curPrimary := shard0.Vttablets[0]

// Promote the first tablet as the primary
err := clusterInfo.ClusterInstance.VtctlclientProcess.InitializeShard(keyspace.Name, shard0.Name, clusterInfo.ClusterInstance.Cell, curPrimary.TabletUID)
require.NoError(t, err)

// find the replica and rdonly tablets
var replica, rdonly *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
// we know we have only two replcia tablets, so the one not the primary must be the other replica
if tablet.Alias != curPrimary.Alias && tablet.Type == "replica" {
replica = tablet
}
if tablet.Type == "rdonly" {
rdonly = tablet
}
}
assert.NotNil(t, replica, "could not find replica tablet")
assert.NotNil(t, rdonly, "could not find rdonly tablet")

// check that the replication is setup correctly before we failover
utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{rdonly, replica}, 10*time.Second)

// Make the current primary vttablet unavailable.
_ = curPrimary.VttabletProcess.TearDown()
err = curPrimary.MysqlctlProcess.Stop()
require.NoError(t, err)

// Start a VTOrc instance
utils.StartVTOrcs(t, clusterInfo, []string{"--remote_operation_timeout=10s"}, cluster.VTOrcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)

vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0]

defer func() {
// we remove the tablet from our global list
utils.PermanentlyRemoveVttablet(clusterInfo, curPrimary)
}()

// check that the replica gets promoted
utils.CheckPrimaryTablet(t, clusterInfo, replica, true)

// also check that the replication is working correctly after failover
utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{rdonly}, 10*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1)
}

// TestDeadPrimaryRecoversImmediately test Vtorc ability to recover immediately if primary is dead.
// Reason is, unlike other recoveries, in DeadPrimary we don't call DiscoverInstance since we know
// that primary is unreachable. This help us save few seconds depending on value of `RemoteOperationTimeout` flag.
Expand Down
36 changes: 14 additions & 22 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ type CellInfo struct {

// VTOrcClusterInfo stores the information for a cluster. This is supposed to be used only for VTOrc tests.
type VTOrcClusterInfo struct {
ClusterInstance *cluster.LocalProcessCluster
Ts *topo.Server
CellInfos []*CellInfo
VtctldClientProcess *cluster.VtctldClientProcess
lastUsedValue int
ClusterInstance *cluster.LocalProcessCluster
Ts *topo.Server
CellInfos []*CellInfo
lastUsedValue int
}

// CreateClusterAndStartTopo starts the cluster and topology service
Expand Down Expand Up @@ -100,17 +99,13 @@ func CreateClusterAndStartTopo(cellInfos []*CellInfo) (*VTOrcClusterInfo, error)
return nil, err
}

// store the vtctldclient process
vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)

// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
return &VTOrcClusterInfo{
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: cellInfos,
lastUsedValue: 100,
VtctldClientProcess: vtctldClientProcess,
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: cellInfos,
lastUsedValue: 100,
}, err
}

Expand Down Expand Up @@ -307,7 +302,7 @@ func SetupVttabletsAndVTOrcs(t *testing.T, clusterInfo *VTOrcClusterInfo, numRep
if durability == "" {
durability = "none"
}
out, err := clusterInfo.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
require.NoError(t, err, out)

// start vtorc
Expand Down Expand Up @@ -829,20 +824,17 @@ func SetupNewClusterSemiSync(t *testing.T) *VTOrcClusterInfo {
require.NoError(t, err)
}

vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)

out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
require.NoError(t, err, out)

// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)
clusterInfo := &VTOrcClusterInfo{
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: nil,
lastUsedValue: 100,
VtctldClientProcess: vtctldClientProcess,
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: nil,
lastUsedValue: 100,
}
return clusterInfo
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtorc/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type StructureAnalysisCode string
const (
NoProblem AnalysisCode = "NoProblem"
ClusterHasNoPrimary AnalysisCode = "ClusterHasNoPrimary"
InvalidPrimary AnalysisCode = "InvalidPrimary"
InvalidReplica AnalysisCode = "InvalidReplica"
DeadPrimaryWithoutReplicas AnalysisCode = "DeadPrimaryWithoutReplicas"
DeadPrimary AnalysisCode = "DeadPrimary"
DeadPrimaryAndReplicas AnalysisCode = "DeadPrimaryAndReplicas"
Expand Down
88 changes: 73 additions & 15 deletions go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,20 @@ func initializeAnalysisDaoPostConfiguration() {

type clusterAnalysis struct {
hasClusterwideAction bool
totalTablets int
primaryAlias string
durability reparentutil.Durabler
}

// GetReplicationAnalysis will check for replication problems (dead primary; unreachable primary; etc)
func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) {
result := []ReplicationAnalysis{}
func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]*ReplicationAnalysis, error) {
var result []*ReplicationAnalysis
appendAnalysis := func(analysis *ReplicationAnalysis) {
if analysis.Analysis == NoProblem && len(analysis.StructureAnalysis) == 0 {
return
}
result = append(result, analysis)
}

// TODO(sougou); deprecate ReduceReplicationAnalysisCount
args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, keyspace, shard)
Expand Down Expand Up @@ -262,6 +269,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
)
LEFT JOIN database_instance primary_instance ON (
vitess_tablet.alias = primary_instance.alias
AND vitess_tablet.hostname = primary_instance.hostname
AND vitess_tablet.port = primary_instance.port
)
LEFT JOIN vitess_tablet primary_tablet ON (
primary_tablet.hostname = primary_instance.source_host
Expand All @@ -286,7 +295,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna

clusters := make(map[string]*clusterAnalysis)
err := db.Db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error {
a := ReplicationAnalysis{
a := &ReplicationAnalysis{
Analysis: NoProblem,
ProcessingNodeHostname: process.ThisHostname,
ProcessingNodeToken: util.ProcessToken.Hash,
Expand Down Expand Up @@ -406,6 +415,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
}
// ca has clusterwide info
ca := clusters[keyspaceShard]
// Increment the total number of tablets.
ca.totalTablets += 1
if ca.hasClusterwideAction {
// We can only take one cluster level action at a time.
return nil
Expand All @@ -415,10 +426,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
return nil
}
isInvalid := m.GetBool("is_invalid")
if isInvalid {
return nil
}
if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
if a.IsClusterPrimary && isInvalid {
a.Analysis = InvalidPrimary
a.Description = "VTOrc hasn't been able to reach the primary even once since restart/shutdown"
} else if isInvalid {
a.Analysis = InvalidReplica
a.Description = "VTOrc hasn't been able to reach the replica even once since restart/shutdown"
} else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
a.Analysis = DeadPrimaryWithoutReplicas
a.Description = "Primary cannot be reached by vtorc and has no replica"
ca.hasClusterwideAction = true
Expand Down Expand Up @@ -532,13 +546,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
// a.Description = "Primary has no replicas"
// }

appendAnalysis := func(analysis *ReplicationAnalysis) {
if a.Analysis == NoProblem && len(a.StructureAnalysis) == 0 {
return
}
result = append(result, a)
}

{
// Moving on to structure analysis
// We also do structural checks. See if there's potential danger in promotions
Expand Down Expand Up @@ -579,7 +586,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.StructureAnalysis = append(a.StructureAnalysis, NotEnoughValidSemiSyncReplicasStructureWarning)
}
}
appendAnalysis(&a)
appendAnalysis(a)

if a.CountReplicas > 0 && hints.AuditAnalysis {
// Interesting enough for analysis
Expand All @@ -590,13 +597,64 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
return nil
})

for _, analysis := range result {
log.Errorf("Analysis - Instance - %v, Code - %v, LastCheckValid - %v, ReplStopped - %v", analysis.AnalyzedInstanceAlias, analysis.Analysis, analysis.LastCheckValid, analysis.ReplicationStopped)
}

result = postProcessAnalyses(result, clusters)

if err != nil {
log.Error(err)
}
// TODO: result, err = getConcensusReplicationAnalysis(result)
return result, err
}

// postProcessAnalyses is used to update different analyses based on the information gleaned from looking at all the analyses together instead of individual data.
func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clusterAnalysis) []*ReplicationAnalysis {
for {
// Store whether we have changed the result of replication analysis or not.
resultChanged := false

// Go over all the analyses.
for _, analysis := range result {
// If one of them is an InvalidPrimary, then we see if all the other tablets in this keyspace shard are
// unable to replicate or not.
if analysis.Analysis == InvalidPrimary {
keyspaceName := analysis.ClusterDetails.Keyspace
shardName := analysis.ClusterDetails.Shard
keyspaceShard := getKeyspaceShardName(keyspaceName, shardName)
totalReplicas := clusters[keyspaceShard].totalTablets - 1
var notReplicatingReplicas []int
for idx, replicaAnalysis := range result {
if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName &&
replicaAnalysis.ClusterDetails.Shard == shardName && topo.IsReplicaType(replicaAnalysis.TabletType) {
// If the replica's last check is invalid or its replication is stopped, then we consider as not replicating.
if !replicaAnalysis.LastCheckValid || replicaAnalysis.ReplicationStopped {
notReplicatingReplicas = append(notReplicatingReplicas, idx)
}
}
}
// If none of the other tablets are able to replicate, then we conclude that this primary is not just Invalid, but also Dead.
// In this case, we update the analysis for the primary tablet and remove all the analyses of the replicas.
if totalReplicas > 0 && len(notReplicatingReplicas) == totalReplicas {
resultChanged = true
analysis.Analysis = DeadPrimary
for i := len(notReplicatingReplicas) - 1; i >= 0; i-- {
idxToRemove := notReplicatingReplicas[i]
result = append(result[0:idxToRemove], result[idxToRemove+1:]...)
}
break
}
}
}
if !resultChanged {
break
}
}
return result
}

// auditInstanceAnalysisInChangelog will write down an instance's analysis in the database_instance_analysis_changelog table.
// To not repeat recurring analysis code, the database_instance_last_analysis table is used, so that only changes to
// analysis codes are written.
Expand Down
Loading

0 comments on commit e6da11d

Please sign in to comment.