Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-6.5' into pr/40689
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 committed Jan 18, 2023
2 parents 1e3ba9e + d1dcac6 commit e0d6c57
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 37 deletions.
8 changes: 4 additions & 4 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func (bc *Client) StartCheckpointRunner(
}
}

bc.checkpointRunner = checkpoint.StartCheckpointRunner(ctx, bc.storage, bc.cipher)
return nil
bc.checkpointRunner, err = checkpoint.StartCheckpointRunner(ctx, bc.storage, bc.cipher, bc.mgr.GetPDClient())
return errors.Trace(err)
}

func (bc *Client) WaitForFinishCheckpoint() {
func (bc *Client) WaitForFinishCheckpoint(ctx context.Context) {
if bc.checkpointRunner != nil {
bc.checkpointRunner.WaitForFinish()
bc.checkpointRunner.WaitForFinish(ctx)
}
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -29,5 +30,6 @@ go_test(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
],
)
208 changes: 189 additions & 19 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand All @@ -44,11 +45,16 @@ const (

CheckpointDataDir = CheckpointDir + "/data"
CheckpointChecksumDir = CheckpointDir + "/checksum"
CheckpointLockPath = CheckpointDir + "/checkpoint.lock"
)

const MaxChecksumTotalCost float64 = 60.0

const tickDuration = 30 * time.Second
const tickDurationForFlush = 30 * time.Second

const tickDurationForLock = 4 * time.Minute

const lockTimeToLive = 5 * time.Minute

type CheckpointMessage struct {
// start-key of the origin range
Expand Down Expand Up @@ -132,6 +138,12 @@ func NewChecksumRunner() *ChecksumRunner {
}
}

func (cr *ChecksumRunner) RecordError(err error) {
cr.Lock()
cr.err = err
cr.Unlock()
}

// FlushChecksum save the checksum in the memory temporarily
// and flush to the external storage if checksum take much time
func (cr *ChecksumRunner) FlushChecksum(
Expand Down Expand Up @@ -180,15 +192,10 @@ func (cr *ChecksumRunner) FlushChecksum(
cr.wg.Add(1)
cr.workerPool.Apply(func() {
defer cr.wg.Done()
recordErr := func(err error) {
cr.Lock()
cr.err = err
cr.Unlock()
}

content, err := json.Marshal(toBeFlushedChecksumItems)
if err != nil {
recordErr(err)
cr.RecordError(err)
return
}

Expand All @@ -200,70 +207,90 @@ func (cr *ChecksumRunner) FlushChecksum(

data, err := json.Marshal(checksumInfo)
if err != nil {
recordErr(err)
cr.RecordError(err)
return
}

fname := fmt.Sprintf("%s/t%d_and__", CheckpointChecksumDir, tableID)
err = s.WriteFile(ctx, fname, data)
if err != nil {
recordErr(err)
cr.RecordError(err)
return
}
})
return nil
}

type GlobalTimer interface {
GetTS(context.Context) (int64, int64, error)
}

type CheckpointRunner struct {
lockId uint64

meta map[string]*RangeGroups

checksumRunner *ChecksumRunner

storage storage.ExternalStorage
cipher *backuppb.CipherInfo
timer GlobalTimer

appendCh chan *CheckpointMessage
metaCh chan map[string]*RangeGroups
lockCh chan struct{}
errCh chan error

wg sync.WaitGroup
}

// only for test
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration) *CheckpointRunner {
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

checksumRunner: NewChecksumRunner(),

storage: storage,
cipher: cipher,
timer: timer,

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
lockCh: make(chan struct{}),
errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tick)
return runner
err := runner.initialLock(ctx)
if err != nil {
return nil, errors.Annotate(err, "Failed to initialize checkpoint lock.")
}
runner.startCheckpointLoop(ctx, tick, tick)
return runner, nil
}

func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo) *CheckpointRunner {
func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

checksumRunner: NewChecksumRunner(),

storage: storage,
cipher: cipher,
timer: timer,

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
lockCh: make(chan struct{}),
errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tickDuration)
return runner
err := runner.initialLock(ctx)
if err != nil {
return nil, errors.Trace(err)
}
runner.startCheckpointLoop(ctx, tickDurationForFlush, tickDurationForLock)
return runner, nil
}

func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, totalBytes uint64, timeCost float64) error {
Expand Down Expand Up @@ -295,13 +322,18 @@ func (r *CheckpointRunner) Append(
}

// Note: Cannot be parallel with `Append` function
func (r *CheckpointRunner) WaitForFinish() {
func (r *CheckpointRunner) WaitForFinish(ctx context.Context) {
// can not append anymore
close(r.appendCh)
// wait the range flusher exit
r.wg.Wait()
// wait the checksum flusher exit
r.checksumRunner.wg.Wait()
// remove the checkpoint lock
err := r.storage.DeleteFile(ctx, CheckpointLockPath)
if err != nil {
log.Warn("failed to remove the checkpoint lock", zap.Error(err))
}
}

// Send the meta to the flush goroutine, and reset the CheckpointRunner's meta
Expand All @@ -318,6 +350,16 @@ func (r *CheckpointRunner) flushMeta(ctx context.Context, errCh chan error) erro
return nil
}

func (r *CheckpointRunner) setLock(ctx context.Context, errCh chan error) error {
select {
case <-ctx.Done():
case err := <-errCh:
return err
case r.lockCh <- struct{}{}:
}
return nil
}

// start a goroutine to flush the meta, which is sent from `checkpoint looper`, to the external storage
func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.WaitGroup) chan error {
errCh := make(chan error, 1)
Expand All @@ -337,6 +379,15 @@ func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.W
errCh <- err
return
}
case _, ok := <-r.lockCh:
if !ok {
log.Info("stop checkpoint flush worker")
return
}
if err := r.updateLock(ctx); err != nil {
errCh <- errors.Annotate(err, "Failed to update checkpoint lock.")
return
}
}
}
}
Expand All @@ -351,22 +402,31 @@ func (r *CheckpointRunner) sendError(err error) {
default:
log.Error("errCh is blocked", logutil.ShortError(err))
}
r.checksumRunner.RecordError(err)
}

func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration time.Duration) {
func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush, tickDurationForLock time.Duration) {
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
defer r.wg.Done()
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
errCh := r.startCheckpointRunner(cctx, &wg)
ticker := time.NewTicker(tickDuration)
flushTicker := time.NewTicker(tickDurationForFlush)
defer flushTicker.Stop()
lockTicker := time.NewTicker(tickDurationForLock)
defer lockTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-lockTicker.C:
if err := r.setLock(ctx, errCh); err != nil {
r.sendError(err)
return
}
case <-flushTicker.C:
if err := r.flushMeta(ctx, errCh); err != nil {
r.sendError(err)
return
Expand All @@ -380,6 +440,7 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration
// close the channel to flush worker
// and wait it to consumes all the metas
close(r.metaCh)
close(r.lockCh)
wg.Wait()
return
}
Expand Down Expand Up @@ -463,6 +524,115 @@ func (r *CheckpointRunner) doFlush(ctx context.Context, meta map[string]*RangeGr
return nil
}

type CheckpointLock struct {
LockId uint64 `json:"lock-id"`
ExpireAt int64 `json:"expire-at"`
}

// get ts with retry
func (r *CheckpointRunner) getTS(ctx context.Context) (int64, int64, error) {
var (
p int64 = 0
l int64 = 0
retry int = 0
)
errRetry := utils.WithRetry(ctx, func() error {
var err error
p, l, err = r.timer.GetTS(ctx)
if err != nil {
retry++
log.Info("failed to get ts", zap.Int("retry", retry), zap.Error(err))
return err
}

return nil
}, utils.NewPDReqBackoffer())

return p, l, errors.Trace(errRetry)
}

// flush the lock to the external storage
func (r *CheckpointRunner) flushLock(ctx context.Context, p int64) error {
lock := &CheckpointLock{
LockId: r.lockId,
ExpireAt: p + lockTimeToLive.Milliseconds(),
}
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p), zap.Int64("expire-at", lock.ExpireAt))
data, err := json.Marshal(lock)
if err != nil {
return errors.Trace(err)
}

err = r.storage.WriteFile(ctx, CheckpointLockPath, data)
return errors.Trace(err)
}

// check whether this lock belongs to this BR
func (r *CheckpointRunner) checkLockFile(ctx context.Context, now int64) error {
data, err := r.storage.ReadFile(ctx, CheckpointLockPath)
if err != nil {
return errors.Trace(err)
}
lock := &CheckpointLock{}
err = json.Unmarshal(data, lock)
if err != nil {
return errors.Trace(err)
}
if lock.ExpireAt <= now {
if lock.LockId > r.lockId {
return errors.Errorf("There are another BR(%d) running after but setting lock before this one(%d). "+
"Please check whether the BR is running. If not, you can retry.", lock.LockId, r.lockId)
}
if lock.LockId == r.lockId {
log.Warn("The lock has expired.", zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
}
} else if lock.LockId != r.lockId {
return errors.Errorf("The existing lock will expire in %d seconds. "+
"There may be another BR(%d) running. If not, you can wait for the lock to expire, or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId, strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
}

return nil
}

// generate a new lock and flush the lock to the external storage
func (r *CheckpointRunner) updateLock(ctx context.Context) error {
p, _, err := r.getTS(ctx)
if err != nil {
return errors.Trace(err)
}
if err = r.checkLockFile(ctx, p); err != nil {
return errors.Trace(err)
}
return errors.Trace(r.flushLock(ctx, p))
}

// Attempt to initialize the lock. Need to stop the backup when there is an unexpired locks.
func (r *CheckpointRunner) initialLock(ctx context.Context) error {
p, l, err := r.getTS(ctx)
if err != nil {
return errors.Trace(err)
}
r.lockId = oracle.ComposeTS(p, l)
exist, err := r.storage.FileExists(ctx, CheckpointLockPath)
if err != nil {
return errors.Trace(err)
}
if exist {
if err := r.checkLockFile(ctx, p); err != nil {
return errors.Trace(err)
}
}
if err = r.flushLock(ctx, p); err != nil {
return errors.Trace(err)
}

// wait for 3 seconds to check whether the lock file is overwritten by another BR
time.Sleep(3 * time.Second)
err = r.checkLockFile(ctx, p)
return errors.Trace(err)
}

// walk the whole checkpoint range files and retrieve the metadatat of backed up ranges
// and return the total time cost in the past executions
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
Expand Down
Loading

0 comments on commit e0d6c57

Please sign in to comment.