Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: support reset_tiflash after ebs restoration #40124

Merged
merged 16 commits into from
Jan 9, 2023
93 changes: 93 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2721,3 +2721,96 @@ func CheckNewCollationEnable(
log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled))
return nil
}

func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
recorder := tiflashrec.New()

expectTiFlashStoreCount := uint64(0)
needTiFlash := false
for _, s := range allSchema {
for _, t := range s.Tables {
if t.TiFlashReplica != nil {
expectTiFlashStoreCount = mathutil.Max(expectTiFlashStoreCount, t.TiFlashReplica.Count)
recorder.AddTable(t.ID, *t.TiFlashReplica)
needTiFlash = true
}
}
}
if !needTiFlash {
log.Info("no need to set tiflash replica, since there is no tables enable tiflash replica")
return nil
}
// we wait for ten minutes to wait tiflash starts.
// since tiflash only starts when set unmark recovery mode finished.
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = utils.WithRetry(timeoutCtx, func() error {
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
log.Info("get tiflash store count for resetting TiFlash Replica",
zap.Uint64("count", tiFlashStoreCount))
if err != nil {
return errors.Trace(err)
}
if tiFlashStoreCount < expectTiFlashStoreCount {
log.Info("still waiting for enough tiflash store start",
zap.Uint64("expect", expectTiFlashStoreCount),
zap.Uint64("actual", tiFlashStoreCount),
)
return errors.New("tiflash store count is less than expected")
}
return nil
}, &waitTiFlashBackoffer{
Attempts: 30,
BaseBackoff: 4 * time.Second,
})
if err != nil {
return err
}

sqls := recorder.GenerateResetAlterTableDDLs(info)
log.Info("Generating SQLs for resetting tiflash replica",
zap.Strings("sqls", sqls))

return g.UseOneShotSession(storage, false, func(se glue.Session) error {
for _, sql := range sqls {
if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
logutil.ShortError(errExec),
zap.String("sql", sql),
)
}
}
return nil
})

}

type waitTiFlashBackoffer struct {
Attempts int
BaseBackoff time.Duration
}

// NextBackoff returns a duration to wait before retrying again
func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > 32*time.Second {
b.BaseBackoff = 32 * time.Second
}
return bo
}

// Attempt returns the remain attempt times
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}
49 changes: 47 additions & 2 deletions br/pkg/restore/tiflashrec/tiflash_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,43 @@ func (r *TiFlashRecorder) Rewrite(oldID int64, newID int64) {
}
}

func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(id)
if !ok {
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
}
schema, ok := info.SchemaByTable(table.Meta())
if !ok {
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
}
altTableSpec, err := alterTableSpecOf(replica, true)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
}
items = append(items, fmt.Sprintf(
"ALTER TABLE %s %s",
utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O),
altTableSpec),
)
altTableSpec, err = alterTableSpecOf(replica, false)
Comment on lines +98 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about the reason for setting the tiflash-count=0 and then set normal tiflash-count.

if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
}
items = append(items, fmt.Sprintf(
"ALTER TABLE %s %s",
utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O),
altTableSpec),
)
})
return items
}

func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
Expand All @@ -92,7 +129,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
}
altTableSpec, err := alterTableSpecOf(replica)
altTableSpec, err := alterTableSpecOf(replica, false)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
Expand All @@ -106,14 +143,22 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s
return items
}

func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) {
func alterTableSpecOf(replica model.TiFlashReplicaInfo, reset bool) (string, error) {
spec := &ast.AlterTableSpec{
Tp: ast.AlterTableSetTiFlashReplica,
TiFlashReplica: &ast.TiFlashReplicaSpec{
Count: replica.Count,
Labels: replica.LocationLabels,
},
}
if reset {
spec = &ast.AlterTableSpec{
Tp: ast.AlterTableSetTiFlashReplica,
TiFlashReplica: &ast.TiFlashReplicaSpec{
Count: 0,
},
}
}

buf := bytes.NewBuffer(make([]byte, 0, 32))
restoreCx := format.NewRestoreCtx(
Expand Down
30 changes: 30 additions & 0 deletions br/pkg/restore/tiflashrec/tiflash_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,33 @@ func TestGenSql(t *testing.T) {
"ALTER TABLE `test`.`evils` SET TIFLASH REPLICA 1 LOCATION LABELS 'kIll''; OR DROP DATABASE test --', 'dEaTh with " + `\\"quoting\\"` + "'",
})
}

func TestGenResetSql(t *testing.T) {
tInfo := func(id int, name string) *model.TableInfo {
return &model.TableInfo{
ID: int64(id),
Name: model.NewCIStr(name),
}
}
fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{
tInfo(1, "fruits"),
tInfo(2, "whisper"),
})
rec := tiflashrec.New()
rec.AddTable(1, model.TiFlashReplicaInfo{
Count: 1,
})
rec.AddTable(2, model.TiFlashReplicaInfo{
Count: 2,
LocationLabels: []string{"climate"},
})

sqls := rec.GenerateResetAlterTableDDLs(fakeInfo)
require.ElementsMatch(t, sqls, []string{
"ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 0",
"ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 2 LOCATION LABELS 'climate'",
"ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 0",
"ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 1",
})
}

4 changes: 4 additions & 0 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
//TODO: restore volume type into origin type
//ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) by backupmeta

// since we cannot reset tiflash automaticlly. so we should start it manually
if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil {
return errors.Trace(err)
}
progress.Close()
summary.CollectDuration("restore duration", time.Since(startAll))
summary.SetSuccessStatus(true)
Expand Down