Skip to content

Commit

Permalink
Merge branch 'master' into repertoire
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Dec 10, 2021
2 parents 88dcdbb + 828c1dd commit 74ec492
Show file tree
Hide file tree
Showing 47 changed files with 15,147 additions and 13,614 deletions.
17 changes: 0 additions & 17 deletions .github/workflows/dumpling_integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,6 @@ concurrency:
cancel-in-progress: true

jobs:
unit-test:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
fail-fast: true
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 2
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: Unit test
run: make dumpling_unit_test WITH_RACE=1
- uses: codecov/codecov-action@v1

integration-test-mysql-5735:
runs-on: ubuntu-latest
timeout-minutes: 15
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ gotest_in_verify_ci_part_2: failpoint-enable tools/bin/gotestsum tools/bin/gocov

race: failpoint-enable
@export log_level=debug; \
$(GOTEST) -timeout 20m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -timeout 25m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

leak: failpoint-enable
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ func (c *pdClient) getMaxReplica(ctx context.Context) (int, error) {
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
if err = res.Body.Close(); err != nil {
log.Error("Response fail to close", zap.Error(err))
}
}()
var conf config.Config
if err := json.NewDecoder(res.Body).Decode(&conf); err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -482,11 +487,15 @@ func (c *pdClient) GetPlacementRule(ctx context.Context, groupID, ruleID string)
if err != nil {
return rule, errors.Trace(err)
}
defer func() {
if err = res.Body.Close(); err != nil {
log.Error("Response fail to close", zap.Error(err))
}
}()
b, err := io.ReadAll(res.Body)
if err != nil {
return rule, errors.Trace(err)
}
res.Body.Close()
err = json.Unmarshal(b, &rule)
if err != nil {
return rule, errors.Trace(err)
Expand Down
2 changes: 0 additions & 2 deletions br/tests/lightning_error_summary/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

set -eux

# skip for temporary due to checksum for table a,c succeed, but expect to fail.
exit 0
# Check that error summary are written at the bottom of import.
run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_error_summary;'

Expand Down
46 changes: 46 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
"github.com/tikv/client-go/v2/testutils"
Expand Down Expand Up @@ -7290,6 +7291,51 @@ func (s *testSerialDBSuite) TestJsonUnmarshalErrWhenPanicInCancellingPath(c *C)
c.Assert(err.Error(), Equals, "[kv:1062]Duplicate entry '0' for key 'cc'")
}

// Close issue #24172.
// See https://github.com/pingcap/tidb/issues/24172
func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int)")

var cancelErr error
var rs []sqlexec.RecordSet
hook := &ddl.TestDDLCallback{}
d := s.dom.DDL()
originalHook := d.GetHook()
d.(ddl.DDLForTest).SetHook(hook)
defer d.(ddl.DDLForTest).SetHook(originalHook)

// Test when cancelling cannot be retried and adding index succeeds.
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`), IsNil)
defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }()
rs, cancelErr = tk1.Se.Execute(context.Background(), stmt)
}
}
tk.MustExec("alter table t add index (id)")
c.Assert(cancelErr.Error(), Equals, "mock commit error")

// Test when cancelling is retried only once and adding index is cancelled in the end.
var jobID int64
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
jobID = job.ID
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("retry_once")`), IsNil)
defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }()
rs, cancelErr = tk1.Se.Execute(context.Background(), stmt)
}
}
tk.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob)
c.Assert(cancelErr, IsNil)
result := tk1.ResultSetToResultWithCtx(context.Background(), rs[0], Commentf("cancel ddl job fails"))
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

// For Close issue #24288
// see https://github.com/pingcap/tidb/issues/24288
func (s *testDBSuite8) TestDdlMaxLimitOfIdentifier(c *C) {
Expand Down
8 changes: 4 additions & 4 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if job.Type == model.ActionAddTablePartition {
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundles(context.TODO(), rollbackBundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1208,7 +1208,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, errors.Trace(err)
}

err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func onAlterPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
cp := bundle.Clone()
bundles = append(bundles, cp.Reset(placement.RuleIndexPartition, []int64{id}))
}
err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down
8 changes: 4 additions & 4 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
}

// Send the placement bundle to PD.
err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -580,7 +580,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
return ver, errors.Trace(err)
}

err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func onAlterTablePartitionOptions(d *ddlCtx, t *meta.Meta, job *model.Job) (ver

// Send the placement bundle to PD.
if bundle != nil {
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
}

if err != nil {
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func onAlterTablePlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

// Send the placement bundle to PD.
if bundle != nil {
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
}

if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions domain/infosync/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infosync

import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbterror"
)

var (
// ErrHTTPServiceError means we got a http response with a status code which is not '2xx'
ErrHTTPServiceError = dbterror.ClassDomain.NewStdErr(
errno.ErrHTTPServiceError, mysql.Message("HTTP request failed with status %s", nil),
)
)
41 changes: 40 additions & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const (
TopologyPrometheus = "/topology/prometheus"
// TablePrometheusCacheExpiry is the expiry time for prometheus address cache.
TablePrometheusCacheExpiry = 10 * time.Second
// RequestRetryInterval is the sleep time before next retry for http request
RequestRetryInterval = 200 * time.Millisecond
// SyncBundlesMaxRetry is the max retry times for sync placement bundles
SyncBundlesMaxRetry = 3
)

// ErrPrometheusAddrIsNotSet is the error that Prometheus address is not set in PD and etcd
Expand Down Expand Up @@ -353,7 +357,7 @@ func doRequest(ctx context.Context, addrs []string, route, method string, body i
return nil, err
}
if res.StatusCode != http.StatusOK {
err = errors.Errorf("%s", bodyBytes)
err = ErrHTTPServiceError.FastGen("%s", bodyBytes)
if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusPreconditionFailed {
err = nil
bodyBytes = nil
Expand Down Expand Up @@ -427,6 +431,16 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error)

// PutRuleBundles is used to post specific rule bundles to PD.
func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
failpoint.Inject("putRuleBundlesError", func(isServiceError failpoint.Value) {
var err error
if isServiceError.(bool) {
err = ErrHTTPServiceError.FastGen("mock service error")
} else {
err = errors.New("mock other error")
}
failpoint.Return(err)
})

is, err := getGlobalInfoSyncer()
if err != nil {
return err
Expand All @@ -435,6 +449,31 @@ func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
return is.placementManager.PutRuleBundles(ctx, bundles)
}

// PutRuleBundlesWithRetry will retry for specified times when PutRuleBundles failed
func PutRuleBundlesWithRetry(ctx context.Context, bundles []*placement.Bundle, maxRetry int, interval time.Duration) (err error) {
if maxRetry < 0 {
maxRetry = 0
}

for i := 0; i <= maxRetry; i++ {
if err = PutRuleBundles(ctx, bundles); err == nil || ErrHTTPServiceError.Equal(err) {
return err
}

if i != maxRetry {
logutil.BgLogger().Warn("Error occurs when PutRuleBundles, retry", zap.Error(err))
time.Sleep(interval)
}
}

return
}

// PutRuleBundlesWithDefaultRetry will retry for default times
func PutRuleBundlesWithDefaultRetry(ctx context.Context, bundles []*placement.Bundle) (err error) {
return PutRuleBundlesWithRetry(ctx, bundles, SyncBundlesMaxRetry, RequestRetryInterval)
}

func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
allInfo := make(map[string]*ServerInfo)
if is.etcdCli == nil {
Expand Down
65 changes: 65 additions & 0 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/testbridge"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/integration"
Expand Down Expand Up @@ -145,3 +147,66 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}
return len(resp.Kvs) == 1, nil
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
require.NoError(t, err)
bundle = bundle.Reset(placement.RuleIndexTable, []int64{1024})

t.Run("serviceErrorShouldNotRetry", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.Error(t, err)
require.Equal(t, "[domain:8243]mock service error", err.Error())

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)
require.True(t, got.IsEmpty())
})

t.Run("nonServiceErrorShouldRetry", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "3*return(false)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.NoError(t, err)

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)

gotJSON, err := json.Marshal(got)
require.NoError(t, err)

expectJSON, err := json.Marshal(bundle)
require.NoError(t, err)

require.Equal(t, expectJSON, gotJSON)
})

t.Run("nonServiceErrorRetryAndFail", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "4*return(false)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.Error(t, err)
require.Equal(t, "mock other error", err.Error())

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)
require.True(t, got.IsEmpty())
})
}
Loading

0 comments on commit 74ec492

Please sign in to comment.