Skip to content

Commit

Permalink
Merge branch 'master' into support_continus_capture
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Dec 15, 2022
2 parents 1d71f12 + 79b4e63 commit 58aa636
Show file tree
Hide file tree
Showing 44 changed files with 4,670 additions and 4,011 deletions.
87 changes: 61 additions & 26 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -285,14 +286,18 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
c.subscriber.Clear()
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand All @@ -303,31 +308,34 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
return false
}
if cp <= c.lastCheckpoint {
return false
}
c.lastCheckpoint = cp
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}
log.Info("get checkpoint", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
}
if cp <= c.lastCheckpoint {
return nil
}

log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, cp); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
if c.setCheckpoint(cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
return nil
}

Expand Down Expand Up @@ -375,16 +383,17 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
Expand All @@ -397,6 +406,32 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
if err != nil {
return err
}

return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
}

var errs error

cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
err := c.optionalTick(cx)
if err != nil {
log.Warn("[log backup advancer] option tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

err = c.importantTick(ctx)
if err != nil {
log.Warn("[log backup advancer] important tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

return errs
}
38 changes: 37 additions & 1 deletion br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ package streamhelper
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"strings"

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type EventType int
Expand Down Expand Up @@ -181,11 +185,43 @@ func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error {
return nil
}

func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
key := GlobalCheckpointOf(taskName)
resp, err := t.KV.Get(ctx, key)
if err != nil {
return 0, err
}

if len(resp.Kvs) == 0 {
return 0, nil
}

firstKV := resp.Kvs[0]
value := firstKV.Value
if len(value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the global checkpoint isn't 64bits (it is %d bytes, value = %s)",
len(value),
redact.Key(value))
}

return binary.BigEndian.Uint64(value), nil
}

func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
_, err := t.KV.Put(ctx, key, value)
oldValue, err := t.getGlobalCheckpointForTask(ctx, taskName)
if err != nil {
return err
}

if checkpoint < oldValue {
log.Warn("[log backup advancer] skipping upload global checkpoint", zap.Uint64("old", oldValue), zap.Uint64("new", checkpoint))
return nil
}

_, err = t.KV.Put(ctx, key, value)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
flagTryAdvanceThreshold = "try-advance-threshold"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 9 * time.Minute
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
Expand Down Expand Up @@ -76,11 +76,18 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
// GetSubscriberErrorStartPollThreshold returns the threshold of begin polling the checkpoint
// when the subscriber meets error.
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
// 0.45x of the origin threshold.
// The origin threshold is 0.8x the target RPO,
// and the default flush interval is about 0.5x the target RPO.
// So the relationship between the RPO and the threshold is:
// When subscription is all available, it is 1.7x of the flush interval (which allow us to save in abnormal condition).
// When some of subscriptions are not available, it is 0.75x of the flush interval.
// NOTE: can we make subscription better and give up the poll model?
return conf.TryAdvanceThreshold * 9 / 20
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
// If a tick blocks longer than the interval of ticking, we may need to break it and retry.
return conf.TickDuration
}
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,12 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
req.Len(resp.Kvs, 1)
return binary.BigEndian.Uint64(resp.Kvs[0].Value)
}
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5))
req.EqualValues(5, getCheckpoint())
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18))
req.EqualValues(18, getCheckpoint())
metaCli.ClearV3GlobalCheckpointForTask(ctx, task)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16))
req.EqualValues(18, getCheckpoint())
req.NoError(metaCli.ClearV3GlobalCheckpointForTask(ctx, task))
req.EqualValues(0, getCheckpoint())
}
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func restoreStream(
return errors.Trace(err)
}
defer func() {
if err = restoreGc(); err != nil {
if err := restoreGc(); err != nil {
log.Error("failed to set gc enabled", zap.Error(err))
}
}()
Expand Down
37 changes: 25 additions & 12 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type temporaryIndexRecord struct {
delete bool
unique bool
distinct bool
rowKey kv.Key
}

type mergeIndexWorker struct {
Expand Down Expand Up @@ -133,6 +134,14 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
if idxRecord.skip {
continue
}

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.rowKey)
if err != nil {
return errors.Trace(err)
}

if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
Expand All @@ -149,6 +158,7 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
}
return nil
})

logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000)
return
}
Expand All @@ -166,6 +176,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
oprStartTime := startTime
idxPrefix := w.table.IndexPrefix()
var lastKey kv.Key
isCommonHandle := w.table.Meta().IsCommonHandle
err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(),
taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) {
oprEndTime := time.Now()
Expand All @@ -182,35 +193,37 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
return false, nil
}

isDelete := false
unique := false
length := len(rawValue)
keyVer := rawValue[length-1]
originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
}
rawValue = rawValue[:length-1]
if bytes.Equal(rawValue, tables.DeleteMarker) {
isDelete = true
} else if bytes.Equal(rawValue, tables.DeleteMarkerUnique) {
isDelete = true
unique = true

if handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns))
if err != nil {
return false, err
}
}
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), handle)

originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)

idxRecord := &temporaryIndexRecord{
rowKey: rowKey,
delete: isDelete,
unique: unique,
skip: false,
}
if !isDelete {
idxRecord.vals = rawValue
idxRecord.distinct = tablecodec.IndexKVIsUnique(rawValue)
idxRecord.vals = originVal
idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
Expand Down
Loading

0 comments on commit 58aa636

Please sign in to comment.