From 27a0351ca2d7c569b63c4eb91865fd32c34658c7 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 28 Sep 2023 18:18:21 +0800 Subject: [PATCH] This is an automated cherry-pick of #47322 Signed-off-by: ti-chi-bot --- br/pkg/task/stream.go | 14 ++-- br/pkg/utils/cdc.go | 29 ++++++- br/pkg/utils/cdc_test.go | 17 ++++ executor/importer/precheck_test.go | 124 +++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 8 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 0b195128f42a2..f83876a202fb6 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1076,12 +1076,14 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3. } // check cdc changefeed - nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) - if err != nil { - return err - } - if !nameSet.Empty() { - return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + if cfg.CheckRequirements { + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) + if err != nil { + return err + } + if !nameSet.Empty() { + return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + } } return nil } diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index ab655f826fbb0..69e029135879c 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strings" "github.com/pingcap/errors" @@ -68,6 +69,14 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam return nil, errors.Trace(err) } + // cluster id should be valid in + // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 + reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") + if err != nil { + log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) + reg = nil + } + for _, kv := range resp.Kvs { // example: /tidb/cdc///changefeed/info/ k := kv.Key[len(CDCPrefix):] @@ -75,10 +84,25 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam if !found { continue } - if !isActiveCDCChangefeed(kv.Value) { + // example: clusterAndNamespace normally is / + // but in migration scenario it become __backup__. we need handle it + // see https://github.com/pingcap/tiflow/issues/9807 + clusterID, _, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) + if !found { + // ignore __backup__ or other formats continue } + if reg != nil { + matched := reg.Match(clusterID) + if !matched { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + } + nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) } if len(nameSet) == 0 { @@ -119,7 +143,8 @@ func isActiveCDCChangefeed(jsonBytes []byte) bool { return false } switch s.State { - case "normal", "stopped", "error": + // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview + case "normal", "stopped", "error", "warning": return true default: return false diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go index 1032693a0d3aa..3b5644c22631d 100644 --- a/br/pkg/utils/cdc_test.go +++ b/br/pkg/utils/cdc_test.go @@ -85,4 +85,21 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { require.False(t, nameSet.Empty()) require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", nameSet.MessageToUser()) + + _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) + require.NoError(t, err) + + // ignore __backup__ changefeed + checkEtcdPut( + "/tidb/cdc/__backup__/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + // ignore cluster id only changefeed + checkEtcdPut( + "/tidb/cdc/5402613591834624000/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.True(t, nameSet.Empty()) } diff --git a/executor/importer/precheck_test.go b/executor/importer/precheck_test.go index af8120c62a0af..4f5787907fa44 100644 --- a/executor/importer/precheck_test.go +++ b/executor/importer/precheck_test.go @@ -25,6 +25,130 @@ func TestPreCheckCollector(t *testing.T) { c := newPreCheckCollector() require.True(t, c.success()) +<<<<<<< HEAD c.fail(precheck.CheckTargetTableEmpty, "target table is not empty") require.False(t, c.success()) +======= +func createMockETCD(t *testing.T) (string, *embed.Etcd) { + cfg := embed.NewConfig() + cfg.Dir = t.TempDir() + // rand port in [20000, 60000) + randPort := int(rand.Int31n(40000)) + 20000 + clientAddr := fmt.Sprintf(addrFmt, randPort) + lcurl, _ := url.Parse(clientAddr) + cfg.LCUrls, cfg.ACUrls = []url.URL{*lcurl}, []url.URL{*lcurl} + lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1)) + cfg.LPUrls, cfg.APUrls = []url.URL{*lpurl}, []url.URL{*lpurl} + cfg.InitialCluster = "default=" + lpurl.String() + cfg.Logger = "zap" + embedEtcd, err := embed.StartEtcd(cfg) + require.NoError(t, err) + + select { + case <-embedEtcd.Server.ReadyNotify(): + case <-time.After(5 * time.Second): + embedEtcd.Server.Stop() // trigger a shutdown + require.False(t, true, "server took too long to start") + } + return clientAddr, embedEtcd +} + +func TestCheckRequirements(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + ctx := util.WithInternalSourceType(context.Background(), kv.InternalImportInto) + conn := tk.Session().(sqlexec.SQLExecutor) + + _, err := conn.Execute(ctx, "create table test.t(id int primary key)") + require.NoError(t, err) + is := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema) + tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + c := &importer.LoadDataController{ + Plan: &importer.Plan{ + DBName: "test", + }, + Table: tableObj, + } + require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed) + + // now checkTotalFileSize pass, and try next pre-check item + c.TotalFileSize = 1 + // non-empty table + _, err = conn.Execute(ctx, "insert into test.t values(1)") + require.NoError(t, err) + require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed) + // table not exists + _, err = conn.Execute(ctx, "drop table if exists test.t") + require.NoError(t, err) + require.ErrorContains(t, c.CheckRequirements(ctx, conn), "doesn't exist") + + // create table again, now checkTableEmpty pass + _, err = conn.Execute(ctx, "create table test.t(id int primary key)") + require.NoError(t, err) + + clientAddr, embedEtcd := createMockETCD(t) + require.NotNil(t, embedEtcd) + t.Cleanup(func() { + embedEtcd.Close() + }) + backup := importer.GetEtcdClient + importer.GetEtcdClient = func() (*etcd.Client, error) { + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{clientAddr}, + }) + require.NoError(t, err) + return etcd.NewClient(etcdCli, ""), nil + } + t.Cleanup(func() { + importer.GetEtcdClient = backup + }) + // mock a PiTR task + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{clientAddr}, + }) + t.Cleanup(func() { + require.NoError(t, etcdCli.Close()) + }) + require.NoError(t, err) + pitrKey := streamhelper.PrefixOfTask() + "test" + _, err = etcdCli.Put(ctx, pitrKey, "") + require.NoError(t, err) + err = c.CheckRequirements(ctx, conn) + require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed) + require.ErrorContains(t, err, "found PiTR log streaming") + + // remove PiTR task, and mock a CDC task + _, err = etcdCli.Delete(ctx, pitrKey) + require.NoError(t, err) + // example: /tidb/cdc///changefeed/info/ + cdcKey := utils.CDCPrefix + "testcluster/test_ns/changefeed/info/test_cf" + _, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`) + require.NoError(t, err) + err = c.CheckRequirements(ctx, conn) + require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed) + require.ErrorContains(t, err, "found CDC changefeed") + + // remove CDC task, pass + _, err = etcdCli.Delete(ctx, cdcKey) + require.NoError(t, err) + require.NoError(t, c.CheckRequirements(ctx, conn)) + + // with global sort + c.Plan.CloudStorageURI = ":" + require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI) + c.Plan.CloudStorageURI = "sdsdsdsd://sdsdsdsd" + require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI) + c.Plan.CloudStorageURI = "local:///tmp" + require.ErrorContains(t, c.CheckRequirements(ctx, conn), "unsupported cloud storage uri scheme: local") + // this mock cannot mock credential check, so we just skip it. + backend := s3mem.New() + faker := gofakes3.New(backend) + ts := httptest.NewServer(faker.Server()) + defer ts.Close() + require.NoError(t, backend.CreateBucket("test-bucket")) + c.Plan.CloudStorageURI = fmt.Sprintf("s3://test-bucket/path?region=us-east-1&endpoint=%s&access-key=xxxxxx&secret-access-key=xxxxxx", ts.URL) + require.NoError(t, c.CheckRequirements(ctx, conn)) +>>>>>>> 95378e5bede (br: check the correct changefeed info when restore/import data (#47322)) }