Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#47322
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3pointer authored and ti-chi-bot committed Sep 28, 2023
1 parent 4d91eb1 commit 27a0351
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 8 deletions.
14 changes: 8 additions & 6 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,12 +1076,14 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.
}

// check cdc changefeed
nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI)
if err != nil {
return err
}
if !nameSet.Empty() {
return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser())
if cfg.CheckRequirements {
nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI)
if err != nil {
return err
}
if !nameSet.Empty() {
return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser())
}
}
return nil
}
Expand Down
29 changes: 27 additions & 2 deletions br/pkg/utils/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -68,17 +69,40 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam
return nil, errors.Trace(err)
}

// cluster id should be valid in
// https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218
reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$")
if err != nil {
log.L().Warn("failed to parse cluster id, skip it", zap.Error(err))
reg = nil
}

for _, kv := range resp.Kvs {
// example: /tidb/cdc/<clusterID>/<namespace>/changefeed/info/<changefeedID>
k := kv.Key[len(CDCPrefix):]
clusterAndNamespace, changefeedID, found := bytes.Cut(k, []byte(ChangefeedPath))
if !found {
continue
}
if !isActiveCDCChangefeed(kv.Value) {
// example: clusterAndNamespace normally is <clusterID>/<namespace>
// but in migration scenario it become __backup__. we need handle it
// see https://github.com/pingcap/tiflow/issues/9807
clusterID, _, found := bytes.Cut(clusterAndNamespace, []byte(`/`))
if !found {
// ignore __backup__ or other formats
continue
}

if reg != nil {
matched := reg.Match(clusterID)
if !matched {
continue
}
if !isActiveCDCChangefeed(kv.Value) {
continue
}
}

nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID))
}
if len(nameSet) == 0 {
Expand Down Expand Up @@ -119,7 +143,8 @@ func isActiveCDCChangefeed(jsonBytes []byte) bool {
return false
}
switch s.State {
case "normal", "stopped", "error":
// https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview
case "normal", "stopped", "error", "warning":
return true
default:
return false
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/utils/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,21 @@ func TestGetCDCChangefeedNameSet(t *testing.T) {
require.False(t, nameSet.Empty())
require.Equal(t, "found CDC changefeed(s): cluster/namespace: <nil> changefeed(s): [test], ",
nameSet.MessageToUser())

_, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix())
require.NoError(t, err)

// ignore __backup__ changefeed
checkEtcdPut(
"/tidb/cdc/__backup__/changefeed/info/test",
`{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://[email protected]:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`,
)
// ignore cluster id only changefeed
checkEtcdPut(
"/tidb/cdc/5402613591834624000/changefeed/info/test",
`{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://[email protected]:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`,
)
nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli)
require.NoError(t, err)
require.True(t, nameSet.Empty())
}
124 changes: 124 additions & 0 deletions executor/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,130 @@ func TestPreCheckCollector(t *testing.T) {
c := newPreCheckCollector()
require.True(t, c.success())

<<<<<<< HEAD
c.fail(precheck.CheckTargetTableEmpty, "target table is not empty")
require.False(t, c.success())
=======
func createMockETCD(t *testing.T) (string, *embed.Etcd) {
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()
// rand port in [20000, 60000)
randPort := int(rand.Int31n(40000)) + 20000
clientAddr := fmt.Sprintf(addrFmt, randPort)
lcurl, _ := url.Parse(clientAddr)
cfg.LCUrls, cfg.ACUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
cfg.LPUrls, cfg.APUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
cfg.InitialCluster = "default=" + lpurl.String()
cfg.Logger = "zap"
embedEtcd, err := embed.StartEtcd(cfg)
require.NoError(t, err)

select {
case <-embedEtcd.Server.ReadyNotify():
case <-time.After(5 * time.Second):
embedEtcd.Server.Stop() // trigger a shutdown
require.False(t, true, "server took too long to start")
}
return clientAddr, embedEtcd
}

func TestCheckRequirements(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
ctx := util.WithInternalSourceType(context.Background(), kv.InternalImportInto)
conn := tk.Session().(sqlexec.SQLExecutor)

_, err := conn.Execute(ctx, "create table test.t(id int primary key)")
require.NoError(t, err)
is := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema)
tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

c := &importer.LoadDataController{
Plan: &importer.Plan{
DBName: "test",
},
Table: tableObj,
}
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)

// now checkTotalFileSize pass, and try next pre-check item
c.TotalFileSize = 1
// non-empty table
_, err = conn.Execute(ctx, "insert into test.t values(1)")
require.NoError(t, err)
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)
// table not exists
_, err = conn.Execute(ctx, "drop table if exists test.t")
require.NoError(t, err)
require.ErrorContains(t, c.CheckRequirements(ctx, conn), "doesn't exist")

// create table again, now checkTableEmpty pass
_, err = conn.Execute(ctx, "create table test.t(id int primary key)")
require.NoError(t, err)

clientAddr, embedEtcd := createMockETCD(t)
require.NotNil(t, embedEtcd)
t.Cleanup(func() {
embedEtcd.Close()
})
backup := importer.GetEtcdClient
importer.GetEtcdClient = func() (*etcd.Client, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientAddr},
})
require.NoError(t, err)
return etcd.NewClient(etcdCli, ""), nil
}
t.Cleanup(func() {
importer.GetEtcdClient = backup
})
// mock a PiTR task
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientAddr},
})
t.Cleanup(func() {
require.NoError(t, etcdCli.Close())
})
require.NoError(t, err)
pitrKey := streamhelper.PrefixOfTask() + "test"
_, err = etcdCli.Put(ctx, pitrKey, "")
require.NoError(t, err)
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "found PiTR log streaming")

// remove PiTR task, and mock a CDC task
_, err = etcdCli.Delete(ctx, pitrKey)
require.NoError(t, err)
// example: /tidb/cdc/<clusterID>/<namespace>/changefeed/info/<changefeedID>
cdcKey := utils.CDCPrefix + "testcluster/test_ns/changefeed/info/test_cf"
_, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`)
require.NoError(t, err)
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "found CDC changefeed")

// remove CDC task, pass
_, err = etcdCli.Delete(ctx, cdcKey)
require.NoError(t, err)
require.NoError(t, c.CheckRequirements(ctx, conn))

// with global sort
c.Plan.CloudStorageURI = ":"
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI)
c.Plan.CloudStorageURI = "sdsdsdsd://sdsdsdsd"
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI)
c.Plan.CloudStorageURI = "local:///tmp"
require.ErrorContains(t, c.CheckRequirements(ctx, conn), "unsupported cloud storage uri scheme: local")
// this mock cannot mock credential check, so we just skip it.
backend := s3mem.New()
faker := gofakes3.New(backend)
ts := httptest.NewServer(faker.Server())
defer ts.Close()
require.NoError(t, backend.CreateBucket("test-bucket"))
c.Plan.CloudStorageURI = fmt.Sprintf("s3://test-bucket/path?region=us-east-1&endpoint=%s&access-key=xxxxxx&secret-access-key=xxxxxx", ts.URL)
require.NoError(t, c.CheckRequirements(ctx, conn))
>>>>>>> 95378e5bede (br: check the correct changefeed info when restore/import data (#47322))
}

0 comments on commit 27a0351

Please sign in to comment.