Skip to content

Commit

Permalink
Merge branch 'master' into fix-39719
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Dec 8, 2022
2 parents e6ae3ff + 5348bb3 commit b32c29e
Show file tree
Hide file tree
Showing 28 changed files with 5,658 additions and 5,347 deletions.
23 changes: 23 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,29 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
backupCol.Name, backupCol.FieldType.String())
}
}

if backupTi.Name.L == sysUserTableName {
// check whether the columns of table in cluster are less than the backup data
clusterColMap := make(map[string]*model.ColumnInfo)
for i := range ti.Columns {
col := ti.Columns[i]
clusterColMap[col.Name.L] = col
}
// order can be different
for i := range backupTi.Columns {
col := backupTi.Columns[i]
clusterCol := clusterColMap[col.Name.L]
if clusterCol == nil {
log.Error("missing column in cluster data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"missing column in cluster data, table: %s, col: %s %s",
table.Info.Name.O,
col.Name, col.FieldType.String())
}
}
}
}
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ func TestCheckSysTableCompatibility(t *testing.T) {
Info: mockedUserTI,
}})
require.NoError(t, err)
userTI.Columns = userTI.Columns[:len(userTI.Columns)-1]

// user table in cluster have less columns(failed)
mockedUserTI = userTI.Clone()
mockedUserTI.Columns = append(mockedUserTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))

// column order mismatch(success)
mockedUserTI = userTI.Clone()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (t trivialFlushStream) Recv() (*logbackup.SubscribeFlushEventResponse, erro
return &item, nil
default:
}
return nil, t.cx.Err()
return nil, status.Error(codes.Canceled, t.cx.Err().Error())
}
}

Expand Down
10 changes: 6 additions & 4 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,22 @@ func (s *subscription) connect(ctx context.Context, dialer LogBackupService) {

func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) error {
log.Info("[log backup subscription manager] Adding subscription.", zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt))
s.clearError()
// We should shutdown the background task firstly.
// Once it yields some error during shuting down, the error won't be brought to next run.
s.close()
s.clearError()

c, err := dialer.GetLogBackupClient(ctx, s.storeID)
if err != nil {
return err
return errors.Annotate(err, "failed to get log backup client")
}
cx, cancel := context.WithCancel(ctx)
cli, err := c.SubscribeFlushEvent(cx, &logbackup.SubscribeFlushEventRequest{
ClientId: uuid.NewString(),
})
if err != nil {
cancel()
return err
return errors.Annotate(err, "failed to subscribe events")
}
s.cancel = cancel
s.background = spawnJoinable(func() { s.listenOver(cli) })
Expand All @@ -249,7 +251,7 @@ func (s *subscription) listenOver(cli eventStream) {
msg, err := cli.Recv()
if err != nil {
log.Info("[log backup flush subscriber] Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err))
if err == io.EOF || err == context.Canceled {
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
return
}
s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID))
Expand Down
21 changes: 21 additions & 0 deletions br/pkg/streamhelper/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func installSubscribeSupport(c *fakeCluster) {
Expand Down Expand Up @@ -105,6 +107,25 @@ func TestHasFailureStores(t *testing.T) {
req.NoError(sub.PendingErrors())
}

func TestStoreOffline(t *testing.T) {
req := require.New(t)
ctx := context.Background()
c := createFakeCluster(t, 4, true)
c.splitAndScatter("0001", "0002", "0003", "0008", "0009")
installSubscribeSupport(c)

c.onGetClient = func(u uint64) error {
return status.Error(codes.DataLoss, "upon an eclipsed night, some of data (not all data) have fled from the dataset")
}
sub := streamhelper.NewSubscriber(c, c)
req.NoError(sub.UpdateStoreTopology(ctx))
req.Error(sub.PendingErrors())

c.onGetClient = nil
sub.HandleErrors(ctx)
req.NoError(sub.PendingErrors())
}

func TestStoreRemoved(t *testing.T) {
req := require.New(t)
ctx := context.Background()
Expand Down
3 changes: 2 additions & 1 deletion br/tests/br_full_cluster_restore/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ restart_services
# mock incompatible manually
run_sql "alter table mysql.user add column xx int;"
run_br restore full --with-sys-table --log-file $br_log_file -s "local://$backup_dir" > $res_file 2>&1 || true
check_contains "the target cluster is not compatible with the backup data"
run_sql "select count(*) from mysql.user"
check_contains "count(*): 6"

echo "--> incompatible system table: less column on target cluster"
restart_services
Expand Down
9 changes: 8 additions & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
// Build reorg tasks.
job := reorgInfo.Job
for i, keyRange := range kvRanges {
startKey := keyRange.StartKey
endKey := keyRange.EndKey
endK, err := getRangeEndKey(scheduler.jobCtx, dc.store, job.Priority, prefix, keyRange.StartKey, endKey)
if err != nil {
Expand All @@ -517,11 +518,17 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}
if len(startKey) == 0 {
startKey = prefix
}
if len(endKey) == 0 {
endKey = prefix.PrefixNext()
}

task := &reorgBackfillTask{
id: i,
physicalTableID: physicalTableID,
startKey: keyRange.StartKey,
startKey: startKey,
endKey: endKey,
// If the boundaries overlap, we should ignore the preceding endKey.
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1}
Expand Down
43 changes: 42 additions & 1 deletion ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
Expand All @@ -36,6 +38,8 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/timeutil"
Expand Down Expand Up @@ -120,6 +124,10 @@ type copReqSender struct {
func (c *copReqSender) run() {
p := c.senderPool
defer p.wg.Done()
var curTaskID int
defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() {
p.resultsCh <- idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic}
}, false)
for {
if util.HasCancelled(c.ctx) {
return
Expand All @@ -128,13 +136,19 @@ func (c *copReqSender) run() {
if !ok {
return
}
curTaskID = task.id
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))
rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
failpoint.Inject("MockCopSenderPanic", func(val failpoint.Value) {
if val.(bool) {
panic("mock panic")
}
})
var done bool
var total int
for !done {
Expand Down Expand Up @@ -437,12 +451,39 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se
if err != nil {
return nil, false, errors.Trace(err)
}
rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo)
rsData := getRestoreData(c.tblInfo, c.idxInfo, c.pkInfo, hdDt)
buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false})
}
return buf, false, nil
}

func getRestoreData(tblInfo *model.TableInfo, targetIdx, pkIdx *model.IndexInfo, handleDts []types.Datum) []types.Datum {
if !collate.NewCollationEnabled() || !tblInfo.IsCommonHandle || tblInfo.CommonHandleVersion == 0 {
return nil
}
if pkIdx == nil {
return nil
}
for i, pkIdxCol := range pkIdx.Columns {
pkCol := tblInfo.Columns[pkIdxCol.Offset]
if !types.NeedRestoredData(&pkCol.FieldType) {
// Since the handle data cannot be null, we can use SetNull to
// indicate that this column does not need to be restored.
handleDts[i].SetNull()
continue
}
tables.TryTruncateRestoredData(&handleDts[i], pkCol, pkIdxCol, targetIdx)
tables.ConvertDatumToTailSpaceCount(&handleDts[i], pkCol)
}
dtToRestored := handleDts[:0]
for _, handleDt := range handleDts {
if !handleDt.IsNull() {
dtToRestored = append(dtToRestored, handleDt)
}
}
return dtToRestored
}

func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location())
Expand Down
4 changes: 4 additions & 0 deletions executor/oomtest/oom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ func (h *oomCapture) Write(entry zapcore.Entry, fields []zapcore.Field) error {
h.tracker = str[begin+len("8001]") : end]
return nil
}
// They are just common background task and not related to the oom.
if entry.Message == "SetTiFlashGroupConfig" {
return nil
}

h.mu.Lock()
h.tracker = entry.Message
Expand Down
29 changes: 22 additions & 7 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,14 +946,23 @@ func (e *SimpleExec) executeCreateUser(ctx context.Context, s *ast.CreateUserStm
passwdlockinfo.passwordExpired = "Y"
}

var userAttributes any = nil
var userAttributes []string

if s.CommentOrAttributeOption != nil {
if s.CommentOrAttributeOption.Type == ast.UserCommentType {
userAttributes = fmt.Sprintf("{\"metadata\": {\"comment\": \"%s\"}}", s.CommentOrAttributeOption.Value)
userAttributes = append(userAttributes, fmt.Sprintf("\"metadata\": {\"comment\": \"%s\"}", s.CommentOrAttributeOption.Value))
} else if s.CommentOrAttributeOption.Type == ast.UserAttributeType {
userAttributes = fmt.Sprintf("{\"metadata\": %s}", s.CommentOrAttributeOption.Value)
userAttributes = append(userAttributes, fmt.Sprintf("\"metadata\": %s", s.CommentOrAttributeOption.Value))
}
}
resourceGroupName := "default"
if s.ResourceGroupNameOption != nil {
if s.ResourceGroupNameOption.Type == ast.UserResourceGroupName {
resourceGroupName = s.ResourceGroupNameOption.Value
}
}
userAttributes = append(userAttributes, fmt.Sprintf("\"resource_group\": \"%s\"", resourceGroupName))
userAttributesStr := fmt.Sprintf("{%s}", strings.Join(userAttributes, ","))

tokenIssuer := ""
for _, authTokenOption := range s.AuthTokenOrTLSOptions {
Expand Down Expand Up @@ -1043,7 +1052,7 @@ func (e *SimpleExec) executeCreateUser(ctx context.Context, s *ast.CreateUserStm
}

hostName := strings.ToLower(spec.User.Hostname)
sqlexec.MustFormatSQL(sql, valueTemplate, hostName, spec.User.Username, pwd, authPlugin, userAttributes, passwdlockinfo.lockAccount, recordTokenIssuer, passwdlockinfo.passwordExpired, passwdlockinfo.passwordLifetime)
sqlexec.MustFormatSQL(sql, valueTemplate, hostName, spec.User.Username, pwd, authPlugin, userAttributesStr, passwdlockinfo.lockAccount, recordTokenIssuer, passwdlockinfo.passwordExpired, passwdlockinfo.passwordLifetime)
// add Password_reuse_time value.
if passwdlockinfo.passwordReuseInterval != notSpecified {
sqlexec.MustFormatSQL(sql, `, %?`, passwdlockinfo.passwordReuseInterval)
Expand Down Expand Up @@ -1583,13 +1592,19 @@ func (e *SimpleExec) executeAlterUser(ctx context.Context, s *ast.AlterUserStmt)
fields = append(fields, alterField{"password_lifetime=%?", passwdlockinfo.passwordLifetime})
}

var newAttributes []string
if s.CommentOrAttributeOption != nil {
newAttributesStr := ""
if s.CommentOrAttributeOption.Type == ast.UserCommentType {
newAttributesStr = fmt.Sprintf(`{"metadata": {"comment": "%s"}}`, s.CommentOrAttributeOption.Value)
newAttributes = append(newAttributes, fmt.Sprintf(`"metadata": {"comment": "%s"}`, s.CommentOrAttributeOption.Value))
} else {
newAttributesStr = fmt.Sprintf(`{"metadata": %s}`, s.CommentOrAttributeOption.Value)
newAttributes = append(newAttributes, fmt.Sprintf(`"metadata": %s`, s.CommentOrAttributeOption.Value))
}
}
if s.ResourceGroupNameOption != nil && s.ResourceGroupNameOption.Type == ast.UserResourceGroupName {
newAttributes = append(newAttributes, fmt.Sprintf(`"resource_group": "%s"`, s.ResourceGroupNameOption.Value))
}
if len(newAttributes) > 0 {
newAttributesStr := fmt.Sprintf("{%s}", strings.Join(newAttributes, ","))
fields = append(fields, alterField{"user_attributes=json_merge_patch(coalesce(user_attributes, '{}'), %?)", newAttributesStr})
}

Expand Down
10 changes: 6 additions & 4 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func TestUserAttributes(t *testing.T) {
_, err := rootTK.Exec(`CREATE USER testuser2 ATTRIBUTE '{"name": "Tom", age: 19}'`)
rootTK.MustExec(`CREATE USER testuser2`)
require.Error(t, err)
rootTK.MustQuery(`SELECT user_attributes FROM mysql.user WHERE user = 'testuser'`).Check(testkit.Rows(`{"metadata": {"comment": "1234"}}`))
rootTK.MustQuery(`SELECT user_attributes FROM mysql.user WHERE user = 'testuser1'`).Check(testkit.Rows(`{"metadata": {"age": 19, "name": "Tom"}}`))
rootTK.MustQuery(`SELECT user_attributes FROM mysql.user WHERE user = 'testuser2'`).Check(testkit.Rows(`<nil>`))
rootTK.MustQuery(`SELECT user_attributes FROM mysql.user WHERE user = 'testuser'`).Check(testkit.Rows(`{"metadata": {"comment": "1234"}, "resource_group": "default"}`))
rootTK.MustQuery(`SELECT user_attributes FROM mysql.user WHERE user = 'testuser1'`).Check(testkit.Rows(`{"metadata": {"age": 19, "name": "Tom"}, "resource_group": "default"}`))
rootTK.MustQuery(`SELECT user_attributes FROM mysql.user WHERE user = 'testuser2'`).Check(testkit.Rows(`{"resource_group": "default"}`))
rootTK.MustQueryWithContext(ctx, `SELECT attribute FROM information_schema.user_attributes WHERE user = 'testuser'`).Check(testkit.Rows(`{"comment": "1234"}`))
rootTK.MustQueryWithContext(ctx, `SELECT attribute FROM information_schema.user_attributes WHERE user = 'testuser1'`).Check(testkit.Rows(`{"age": 19, "name": "Tom"}`))
rootTK.MustQueryWithContext(ctx, `SELECT attribute->>"$.age" AS age, attribute->>"$.name" AS name FROM information_schema.user_attributes WHERE user = 'testuser1'`).Check(testkit.Rows(`19 Tom`))
Expand Down Expand Up @@ -127,7 +127,9 @@ func TestUserAttributes(t *testing.T) {
// https://github.com/pingcap/tidb/issues/39207
rootTK.MustExec("create user usr1@'%' identified by 'passord'")
rootTK.MustExec("alter user usr1 comment 'comment1'")
rootTK.MustQuery("select user_attributes from mysql.user where user = 'usr1'").Check(testkit.Rows(`{"metadata": {"comment": "comment1"}}`))
rootTK.MustQuery("select user_attributes from mysql.user where user = 'usr1'").Check(testkit.Rows(`{"metadata": {"comment": "comment1"}, "resource_group": "default"}`))
rootTK.MustExec("alter user usr1 resource group 'rg1'")
rootTK.MustQuery("select user_attributes from mysql.user where user = 'usr1'").Check(testkit.Rows(`{"metadata": {"comment": "comment1"}, "resource_group": "rg1"}`))
}

func TestValidatePassword(t *testing.T) {
Expand Down
Loading

0 comments on commit b32c29e

Please sign in to comment.