From 2d2f4f517a6fc474175377f91599c20af8870781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Fri, 9 Dec 2022 04:02:04 +0800 Subject: [PATCH] ttl: add methods to split ttl scan tasks (#39627) close pingcap/tidb#39626 --- ttl/cache/BUILD.bazel | 16 + ttl/cache/split_test.go | 811 +++++++++++++++++++++++++++++++++++++ ttl/cache/table.go | 307 ++++++++++++++ ttl/sqlbuilder/sql.go | 20 +- ttl/sqlbuilder/sql_test.go | 36 +- ttl/ttlworker/scan.go | 5 +- ttl/ttlworker/scan_test.go | 14 +- 7 files changed, 1181 insertions(+), 28 deletions(-) create mode 100644 ttl/cache/split_test.go diff --git a/ttl/cache/BUILD.bazel b/ttl/cache/BUILD.bazel index e632f113a580b..32772c0f86de0 100644 --- a/ttl/cache/BUILD.bazel +++ b/ttl/cache/BUILD.bazel @@ -12,16 +12,22 @@ go_library( visibility = ["//visibility:public"], deps = [ "//infoschema", + "//kv", "//parser/ast", "//parser/model", "//parser/mysql", + "//parser/terror", "//sessionctx", "//table/tables", + "//tablecodec", "//ttl/session", "//types", "//util/chunk", + "//util/codec", "//util/logutil", + "//util/mathutil", "@com_github_pingcap_errors//:errors", + "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_zap//:zap", ], ) @@ -32,20 +38,30 @@ go_test( "base_test.go", "infoschema_test.go", "main_test.go", + "split_test.go", "table_test.go", "ttlstatus_test.go", ], embed = [":cache"], flaky = True, deps = [ + "//infoschema", + "//kv", "//parser", "//parser/model", "//server", + "//store/helper", + "//tablecodec", "//testkit", "//testkit/testsetup", "//ttl/session", + "//types", + "//util/codec", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//:client", "@org_uber_go_goleak//:goleak", ], ) diff --git a/ttl/cache/split_test.go b/ttl/cache/split_test.go new file mode 100644 index 0000000000000..d244e852fb904 --- /dev/null +++ b/ttl/cache/split_test.go @@ -0,0 +1,811 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache_test + +import ( + "context" + "fmt" + "math" + "sort" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/store/helper" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" +) + +func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region { + leader := &metapb.Peer{ + Id: regionID, + StoreId: 1, + Role: metapb.PeerRole_Voter, + } + + return &pd.Region{ + Meta: &metapb.Region{ + Id: regionID, + StartKey: startKey, + EndKey: endKey, + Peers: []*metapb.Peer{leader}, + }, + Leader: leader, + } +} + +type mockPDClient struct { + t *testing.T + pd.Client + regions []*pd.Region + regionsSorted bool +} + +func (c *mockPDClient) ScanRegions(_ context.Context, key, endKey []byte, limit int) ([]*pd.Region, error) { + if len(c.regions) == 0 { + return []*pd.Region{newMockRegion(1, []byte{}, []byte{0xFF, 0xFF})}, nil + } + + if !c.regionsSorted { + sort.Slice(c.regions, func(i, j int) bool { + return kv.Key(c.regions[i].Meta.StartKey).Cmp(c.regions[j].Meta.StartKey) < 0 + }) + c.regionsSorted = true + } + + regions := []*pd.Region{newMockRegion(1, []byte{}, c.regions[0].Meta.StartKey)} + regions = append(regions, c.regions...) + regions = append(regions, newMockRegion(2, c.regions[len(c.regions)-1].Meta.EndKey, []byte{0xFF, 0xFF, 0xFF})) + + result := make([]*pd.Region, 0) + for _, r := range regions { + if kv.Key(r.Meta.StartKey).Cmp(endKey) >= 0 { + continue + } + + if kv.Key(r.Meta.EndKey).Cmp(key) <= 0 { + continue + } + + if len(result) >= limit { + break + } + + result = append(result, r) + } + return result, nil +} + +func (c *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Store, error) { + return &metapb.Store{ + Id: storeID, + Address: fmt.Sprintf("127.0.0.%d", storeID), + }, nil +} + +type mockTiKVStore struct { + t *testing.T + helper.Storage + pdClient *mockPDClient + cache *tikv.RegionCache + nextRegionID uint64 +} + +func newMockTiKVStore(t *testing.T) *mockTiKVStore { + pdClient := &mockPDClient{t: t} + s := &mockTiKVStore{ + t: t, + pdClient: pdClient, + cache: tikv.NewRegionCache(pdClient), + nextRegionID: 1000, + } + s.refreshCache() + t.Cleanup(func() { + s.cache.Close() + }) + return s +} + +func (s *mockTiKVStore) addFullTableRegion(tableID ...int64) *mockTiKVStore { + prefix1 := tablecodec.GenTablePrefix(tableID[0]) + prefix2 := tablecodec.GenTablePrefix(tableID[len(tableID)-1]) + return s.addRegion(prefix1, prefix2.PrefixNext()) +} + +func (s *mockTiKVStore) addRegionBeginWithTablePrefix(tableID int64, handle kv.Handle) *mockTiKVStore { + start := tablecodec.GenTablePrefix(tableID) + end := tablecodec.EncodeRowKeyWithHandle(tableID, handle) + return s.addRegion(start, end) +} + +func (s *mockTiKVStore) addRegionEndWithTablePrefix(handle kv.Handle, tableID int64) *mockTiKVStore { + start := tablecodec.EncodeRowKeyWithHandle(tableID, handle) + end := tablecodec.GenTablePrefix(tableID + 1) + return s.addRegion(start, end) +} + +func (s *mockTiKVStore) addRegionWithTablePrefix(tableID int64, start kv.Handle, end kv.Handle) *mockTiKVStore { + startKey := tablecodec.EncodeRowKeyWithHandle(tableID, start) + endKey := tablecodec.EncodeRowKeyWithHandle(tableID, end) + return s.addRegion(startKey, endKey) +} + +func (s *mockTiKVStore) addRegion(key, endKey []byte) *mockTiKVStore { + require.True(s.t, kv.Key(endKey).Cmp(key) > 0) + if len(s.pdClient.regions) > 0 { + lastRegion := s.pdClient.regions[len(s.pdClient.regions)-1] + require.True(s.t, kv.Key(endKey).Cmp(lastRegion.Meta.EndKey) >= 0) + } + + regionID := s.nextRegionID + s.nextRegionID++ + leader := &metapb.Peer{ + Id: regionID, + StoreId: 1, + Role: metapb.PeerRole_Voter, + } + + s.pdClient.regions = append(s.pdClient.regions, &pd.Region{ + Meta: &metapb.Region{ + Id: regionID, + StartKey: key, + EndKey: endKey, + Peers: []*metapb.Peer{leader}, + }, + Leader: leader, + }) + + s.pdClient.regionsSorted = false + s.refreshCache() + return s +} + +func (s *mockTiKVStore) refreshCache() { + _, err := s.cache.LoadRegionsInKeyRange( + tikv.NewBackofferWithVars(context.Background(), 1000, nil), + []byte{}, + []byte{0xFF}, + ) + require.NoError(s.t, err) +} + +func (s *mockTiKVStore) batchAddIntHandleRegions(tblID int64, regionCnt int, regionSize int, offset int64) (end kv.IntHandle) { + for i := 0; i < regionCnt; i++ { + start := kv.IntHandle(offset + int64(i*regionSize)) + end = kv.IntHandle(start.IntValue() + int64(regionSize)) + s.addRegionWithTablePrefix(tblID, start, end) + } + return +} + +func (s *mockTiKVStore) clearRegions() { + s.pdClient.regions = nil + s.cache.Close() + s.cache = tikv.NewRegionCache(s.pdClient) + s.refreshCache() +} + +func (s *mockTiKVStore) newCommonHandle(ds ...types.Datum) *kv.CommonHandle { + encoded, err := codec.EncodeKey(nil, nil, ds...) + require.NoError(s.t, err) + h, err := kv.NewCommonHandle(encoded) + require.NoError(s.t, err) + return h +} + +func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache { + return s.cache +} + +func bytesHandle(t *testing.T, data []byte) kv.Handle { + encoded, err := codec.EncodeKey(nil, nil, types.NewBytesDatum(data)) + require.NoError(t, err) + h, err := kv.NewCommonHandle(encoded) + require.NoError(t, err) + return h +} + +func createTTLTable(t *testing.T, tk *testkit.TestKit, name string, option string) *cache.PhysicalTable { + if option == "" { + return createTTLTableWithSQL(t, tk, name, fmt.Sprintf("create table test.%s(t timestamp) TTL = `t` + interval 1 day", name)) + } + + return createTTLTableWithSQL(t, tk, name, fmt.Sprintf("create table test.%s(id %s primary key, t timestamp) TTL = `t` + interval 1 day", name, option)) +} + +func createTTLTableWithSQL(t *testing.T, tk *testkit.TestKit, name string, sql string) *cache.PhysicalTable { + tk.MustExec(sql) + is, ok := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema) + require.True(t, ok) + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(name)) + require.NoError(t, err) + ttlTbl, err := cache.NewPhysicalTable(model.NewCIStr("test"), tbl.Meta(), model.NewCIStr("")) + require.NoError(t, err) + return ttlTbl +} + +func checkRange(t *testing.T, r cache.ScanRange, start, end types.Datum) { + if start.IsNull() { + require.Nil(t, r.Start) + } else { + require.Equal(t, 1, len(r.Start)) + require.Equal(t, start.Kind(), r.Start[0].Kind()) + require.Equal(t, start.GetValue(), r.Start[0].GetValue()) + } + + if end.IsNull() { + require.Nil(t, r.End) + } else { + require.Equal(t, 1, len(r.End)) + require.Equal(t, end.Kind(), r.End[0].Kind()) + require.Equal(t, end.GetValue(), r.End[0].GetValue()) + } +} + +func TestSplitTTLScanRangesWithSignedInt(t *testing.T) { + parser.TTLFeatureGate = true + defer func() { + parser.TTLFeatureGate = false + }() + + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tbls := []*cache.PhysicalTable{ + createTTLTable(t, tk, "t1", "tinyint"), + createTTLTable(t, tk, "t2", "smallint"), + createTTLTable(t, tk, "t3", "mediumint"), + createTTLTable(t, tk, "t4", "int"), + createTTLTable(t, tk, "t5", "bigint"), + createTTLTable(t, tk, "t6", ""), // no clustered + } + + tikvStore := newMockTiKVStore(t) + for _, tbl := range tbls { + // test only one region + tikvStore.clearRegions() + ranges, err := tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test share regions with other table + tikvStore.clearRegions() + tikvStore.addRegion( + tablecodec.GenTablePrefix(tbl.ID-1), + tablecodec.GenTablePrefix(tbl.ID+1), + ) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test one table has multiple regions + tikvStore.clearRegions() + tikvStore.addRegionBeginWithTablePrefix(tbl.ID, kv.IntHandle(0)) + end := tikvStore.batchAddIntHandleRegions(tbl.ID, 8, 100, 0) + tikvStore.addRegionEndWithTablePrefix(end, tbl.ID) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 4, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.NewIntDatum(200)) + checkRange(t, ranges[1], types.NewIntDatum(200), types.NewIntDatum(500)) + checkRange(t, ranges[2], types.NewIntDatum(500), types.NewIntDatum(700)) + checkRange(t, ranges[3], types.NewIntDatum(700), types.Datum{}) + + // test one table has multiple regions and one table region across 0 + tikvStore.clearRegions() + tikvStore.addRegionBeginWithTablePrefix(tbl.ID, kv.IntHandle(-350)) + end = tikvStore.batchAddIntHandleRegions(tbl.ID, 8, 100, -350) + tikvStore.addRegionEndWithTablePrefix(end, tbl.ID) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 5) + require.NoError(t, err) + require.Equal(t, 5, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.NewIntDatum(-250)) + checkRange(t, ranges[1], types.NewIntDatum(-250), types.NewIntDatum(-50)) + checkRange(t, ranges[2], types.NewIntDatum(-50), types.NewIntDatum(150)) + checkRange(t, ranges[3], types.NewIntDatum(150), types.NewIntDatum(350)) + checkRange(t, ranges[4], types.NewIntDatum(350), types.Datum{}) + } +} + +func TestSplitTTLScanRangesWithUnsignedInt(t *testing.T) { + parser.TTLFeatureGate = true + defer func() { + parser.TTLFeatureGate = false + }() + + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tbls := []*cache.PhysicalTable{ + createTTLTable(t, tk, "t1", "tinyint unsigned"), + createTTLTable(t, tk, "t2", "smallint unsigned"), + createTTLTable(t, tk, "t3", "mediumint unsigned"), + createTTLTable(t, tk, "t4", "int unsigned"), + createTTLTable(t, tk, "t5", "bigint unsigned"), + } + + tikvStore := newMockTiKVStore(t) + for _, tbl := range tbls { + // test only one region + tikvStore.clearRegions() + ranges, err := tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test share regions with other table + tikvStore.clearRegions() + tikvStore.addRegion( + tablecodec.GenTablePrefix(tbl.ID-1), + tablecodec.GenTablePrefix(tbl.ID+1), + ) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test one table has multiple regions: [MinInt64, a) [a, b) [b, 0) [0, c) [c, d) [d, MaxInt64] + tikvStore.clearRegions() + tikvStore.addRegionBeginWithTablePrefix(tbl.ID, kv.IntHandle(-200)) + end := tikvStore.batchAddIntHandleRegions(tbl.ID, 4, 100, -200) + tikvStore.addRegionEndWithTablePrefix(end, tbl.ID) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 6) + require.NoError(t, err) + require.Equal(t, 6, len(ranges)) + checkRange(t, ranges[0], types.NewUintDatum(uint64(math.MaxInt64)+1), types.NewUintDatum(uint64(math.MaxUint64)-199)) + checkRange(t, ranges[1], types.NewUintDatum(uint64(math.MaxUint64)-199), types.NewUintDatum(uint64(math.MaxUint64)-99)) + checkRange(t, ranges[2], types.NewUintDatum(uint64(math.MaxUint64)-99), types.Datum{}) + checkRange(t, ranges[3], types.Datum{}, types.NewUintDatum(100)) + checkRange(t, ranges[4], types.NewUintDatum(100), types.NewUintDatum(200)) + checkRange(t, ranges[5], types.NewUintDatum(200), types.NewUintDatum(uint64(math.MaxInt64)+1)) + + // test one table has multiple regions: [MinInt64, a) [a, b) [b, c) [c, d) [d, MaxInt64], b < 0 < c + tikvStore.clearRegions() + tikvStore.addRegionBeginWithTablePrefix(tbl.ID, kv.IntHandle(-150)) + end = tikvStore.batchAddIntHandleRegions(tbl.ID, 3, 100, -150) + tikvStore.addRegionEndWithTablePrefix(end, tbl.ID) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 5) + require.NoError(t, err) + require.Equal(t, 6, len(ranges)) + checkRange(t, ranges[0], types.NewUintDatum(uint64(math.MaxInt64)+1), types.NewUintDatum(uint64(math.MaxUint64)-149)) + checkRange(t, ranges[1], types.NewUintDatum(uint64(math.MaxUint64)-149), types.NewUintDatum(uint64(math.MaxUint64)-49)) + checkRange(t, ranges[2], types.NewUintDatum(uint64(math.MaxUint64)-49), types.Datum{}) + checkRange(t, ranges[3], types.Datum{}, types.NewUintDatum(50)) + checkRange(t, ranges[4], types.NewUintDatum(50), types.NewUintDatum(150)) + checkRange(t, ranges[5], types.NewUintDatum(150), types.NewUintDatum(uint64(math.MaxInt64)+1)) + } +} + +func TestSplitTTLScanRangesWithBytes(t *testing.T) { + parser.TTLFeatureGate = true + defer func() { + parser.TTLFeatureGate = false + }() + + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tbls := []*cache.PhysicalTable{ + createTTLTable(t, tk, "t1", "binary(32)"), + createTTLTable(t, tk, "t2", "char(32) CHARACTER SET BINARY"), + createTTLTable(t, tk, "t3", "varchar(32) CHARACTER SET BINARY"), + createTTLTable(t, tk, "t4", "bit(32)"), + } + + tikvStore := newMockTiKVStore(t) + for _, tbl := range tbls { + // test only one region + tikvStore.clearRegions() + ranges, err := tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test share regions with other table + tikvStore.clearRegions() + tikvStore.addRegion( + tablecodec.GenTablePrefix(tbl.ID-1), + tablecodec.GenTablePrefix(tbl.ID+1), + ) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test one table has multiple regions + tikvStore.clearRegions() + tikvStore.addRegionBeginWithTablePrefix(tbl.ID, bytesHandle(t, []byte{1, 2, 3})) + tikvStore.addRegionWithTablePrefix(tbl.ID, bytesHandle(t, []byte{1, 2, 3}), bytesHandle(t, []byte{1, 2, 3, 4})) + tikvStore.addRegionWithTablePrefix(tbl.ID, bytesHandle(t, []byte{1, 2, 3, 4}), bytesHandle(t, []byte{1, 2, 3, 4, 5})) + tikvStore.addRegionWithTablePrefix(tbl.ID, bytesHandle(t, []byte{1, 2, 3, 4, 5}), bytesHandle(t, []byte{1, 2, 4})) + tikvStore.addRegionWithTablePrefix(tbl.ID, bytesHandle(t, []byte{1, 2, 4}), bytesHandle(t, []byte{1, 2, 5})) + tikvStore.addRegionEndWithTablePrefix(bytesHandle(t, []byte{1, 2, 5}), tbl.ID) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 4, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.NewBytesDatum([]byte{1, 2, 3, 4})) + checkRange(t, ranges[1], types.NewBytesDatum([]byte{1, 2, 3, 4}), types.NewBytesDatum([]byte{1, 2, 4})) + checkRange(t, ranges[2], types.NewBytesDatum([]byte{1, 2, 4}), types.NewBytesDatum([]byte{1, 2, 5})) + checkRange(t, ranges[3], types.NewBytesDatum([]byte{1, 2, 5}), types.Datum{}) + } +} + +func TestNoTTLSplitSupportTables(t *testing.T) { + parser.TTLFeatureGate = true + defer func() { + parser.TTLFeatureGate = false + }() + + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tbls := []*cache.PhysicalTable{ + createTTLTable(t, tk, "t1", "char(32) CHARACTER SET UTF8MB4"), + createTTLTable(t, tk, "t2", "varchar(32) CHARACTER SET UTF8MB4"), + createTTLTable(t, tk, "t3", "double"), + createTTLTable(t, tk, "t4", "decimal(32, 2)"), + } + + tikvStore := newMockTiKVStore(t) + for _, tbl := range tbls { + // test only one region + tikvStore.clearRegions() + ranges, err := tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test share regions with other table + tikvStore.clearRegions() + tikvStore.addRegion( + tablecodec.GenTablePrefix(tbl.ID-1), + tablecodec.GenTablePrefix(tbl.ID+1), + ) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 4) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + + // test one table has multiple regions + tikvStore.clearRegions() + tikvStore.addRegionBeginWithTablePrefix(tbl.ID, bytesHandle(t, []byte{1, 2, 3})) + tikvStore.addRegionWithTablePrefix(tbl.ID, bytesHandle(t, []byte{1, 2, 3}), bytesHandle(t, []byte{1, 2, 3, 4})) + tikvStore.addRegionEndWithTablePrefix(bytesHandle(t, []byte{1, 2, 3, 4}), tbl.ID) + ranges, err = tbl.SplitScanRanges(context.TODO(), tikvStore, 3) + require.NoError(t, err) + require.Equal(t, 1, len(ranges)) + checkRange(t, ranges[0], types.Datum{}, types.Datum{}) + } +} + +func TestGetNextBytesHandleDatum(t *testing.T) { + tblID := int64(7) + buildHandleBytes := func(data []byte) []byte { + handleBytes, err := codec.EncodeKey(nil, nil, types.NewBytesDatum(data)) + require.NoError(t, err) + return handleBytes + } + + buildRowKey := func(handleBytes []byte) kv.Key { + return tablecodec.EncodeRowKey(tblID, handleBytes) + } + + buildBytesRowKey := func(data []byte) kv.Key { + return buildRowKey(buildHandleBytes(data)) + } + + binaryDataStartPos := len(tablecodec.GenTableRecordPrefix(tblID)) + 1 + cases := []struct { + key interface{} + result []byte + isNull bool + }{ + { + key: buildBytesRowKey([]byte{}), + result: []byte{}, + }, + { + key: buildBytesRowKey([]byte{1, 2, 3}), + result: []byte{1, 2, 3}, + }, + { + key: buildBytesRowKey([]byte{1, 2, 3, 0}), + result: []byte{1, 2, 3, 0}, + }, + { + key: buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8}), + result: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + }, + { + key: buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}), + result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + key: buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 0}), + result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 0}, + }, + { + key: []byte{}, + result: []byte{}, + }, + { + key: tablecodec.GenTableRecordPrefix(tblID), + result: []byte{}, + }, + { + key: tablecodec.GenTableRecordPrefix(tblID - 1), + result: []byte{}, + }, + { + key: tablecodec.GenTablePrefix(tblID).PrefixNext(), + isNull: true, + }, + { + key: buildRowKey([]byte{0}), + result: []byte{}, + }, + { + key: buildRowKey([]byte{1}), + result: []byte{}, + }, + { + key: buildRowKey([]byte{2}), + isNull: true, + }, + { + // recordPrefix + bytesFlag + [0] + key: buildBytesRowKey([]byte{})[:binaryDataStartPos+1], + result: []byte{}, + }, + { + // recordPrefix + bytesFlag + [0, 0, 0, 0, 0, 0, 0, 0] + key: buildBytesRowKey([]byte{})[:binaryDataStartPos+8], + result: []byte{}, + }, + { + // recordPrefix + bytesFlag + [1] + key: buildBytesRowKey([]byte{1, 2, 3})[:binaryDataStartPos+1], + result: []byte{1}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3] + key: buildBytesRowKey([]byte{1, 2, 3})[:binaryDataStartPos+3], + result: []byte{1, 2, 3}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 0] + key: buildBytesRowKey([]byte{1, 2, 3})[:binaryDataStartPos+4], + result: []byte{1, 2, 3}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 0, 0, 0, 0, 0, 247] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3}) + bs[len(bs)-1] = 247 + return bs + }, + result: []byte{1, 2, 3}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 0, 0, 0, 0, 0, 0] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3}) + bs[len(bs)-1] = 0 + return bs + }, + result: []byte{1, 2, 3}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 8, 254, 9, 0, 0, 0, 0, 0, 0, 0, 248] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}) + bs[len(bs)-10] = 254 + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 0, 254, 9, 0, 0, 0, 0, 0, 0, 0, 248] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 0, 9}) + bs[len(bs)-10] = 254 + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 0, 253, 9, 0, 0, 0, 0, 0, 0, 0, 248] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 0, 9}) + bs[len(bs)-10] = 253 + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 247] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}) + bs[len(bs)-1] = 247 + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 0] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}) + bs[len(bs)-1] = 0 + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}) + bs = bs[:len(bs)-1] + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8}) + bs = bs[:len(bs)-1] + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + }, + { + // recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 246] + key: func() []byte { + bs := buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8}) + bs = bs[:len(bs)-1] + return bs + }, + result: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + }, + } + + for i, c := range cases { + var key kv.Key + switch k := c.key.(type) { + case kv.Key: + key = k + case []byte: + key = k + case func() []byte: + key = k() + case func() kv.Key: + key = k() + default: + require.FailNow(t, "%d", i) + } + + d := cache.GetNextBytesHandleDatum(key, tablecodec.GenTableRecordPrefix(tblID)) + if c.isNull { + require.True(t, d.IsNull(), i) + } else { + require.Equal(t, types.KindBytes, d.Kind(), i) + require.Equal(t, c.result, d.GetBytes(), i) + } + } +} +func TestGetNextIntHandle(t *testing.T) { + tblID := int64(7) + cases := []struct { + key interface{} + result int64 + isNull bool + }{ + { + key: tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(0)), + result: 0, + }, + { + key: tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(3)), + result: 3, + }, + { + key: tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MaxInt64)), + result: math.MaxInt64, + }, + { + key: tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MinInt64)), + result: math.MinInt64, + }, + { + key: []byte{}, + result: math.MinInt64, + }, + { + key: tablecodec.GenTableRecordPrefix(tblID), + result: math.MinInt64, + }, + { + key: tablecodec.GenTableRecordPrefix(tblID - 1), + result: math.MinInt64, + }, + { + key: tablecodec.GenTablePrefix(tblID).PrefixNext(), + isNull: true, + }, + { + key: tablecodec.EncodeRowKey(tblID, []byte{0}), + result: codec.DecodeCmpUintToInt(0), + }, + { + key: tablecodec.EncodeRowKey(tblID, []byte{0, 1, 2, 3}), + result: codec.DecodeCmpUintToInt(0x0001020300000000), + }, + { + key: tablecodec.EncodeRowKey(tblID, []byte{8, 1, 2, 3}), + result: codec.DecodeCmpUintToInt(0x0801020300000000), + }, + { + key: tablecodec.EncodeRowKey(tblID, []byte{0, 1, 2, 3, 4, 5, 6, 7, 0}), + result: codec.DecodeCmpUintToInt(0x0001020304050607) + 1, + }, + { + key: tablecodec.EncodeRowKey(tblID, []byte{8, 1, 2, 3, 4, 5, 6, 7, 0}), + result: codec.DecodeCmpUintToInt(0x0801020304050607) + 1, + }, + { + key: tablecodec.EncodeRowKey(tblID, []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}), + result: math.MaxInt64, + }, + { + key: tablecodec.EncodeRowKey(tblID, []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0}), + isNull: true, + }, + } + + for i, c := range cases { + var key kv.Key + switch k := c.key.(type) { + case kv.Key: + key = k + case []byte: + key = k + case func() []byte: + key = k() + case func() kv.Key: + key = k() + default: + require.FailNow(t, "%d", i) + } + + v := cache.GetNextIntHandle(key, tablecodec.GenTableRecordPrefix(tblID)) + if c.isNull { + require.Nil(t, v, i) + } else { + require.IsType(t, kv.IntHandle(0), v, i) + require.Equal(t, c.result, v.IntValue()) + } + } +} diff --git a/ttl/cache/table.go b/ttl/cache/table.go index e1637ac33906e..ec64cade3cf5e 100644 --- a/ttl/cache/table.go +++ b/ttl/cache/table.go @@ -16,17 +16,25 @@ package cache import ( "context" + "encoding/binary" "fmt" + "math" "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/mathutil" + "github.com/tikv/client-go/v2/tikv" ) func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, error) { @@ -54,6 +62,32 @@ func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.Fie return []*model.ColumnInfo{extraHandleColInfo}, []*types.FieldType{&extraHandleColInfo.FieldType}, nil } +// ScanRange is the range to scan. The range is: [Start, End) +type ScanRange struct { + Start []types.Datum + End []types.Datum +} + +func newFullRange() ScanRange { + return ScanRange{} +} + +func newDatumRange(start types.Datum, end types.Datum) (r ScanRange) { + if !start.IsNull() { + r.Start = []types.Datum{start} + } + if !end.IsNull() { + r.End = []types.Datum{end} + } + return r +} + +func nullDatum() types.Datum { + d := types.Datum{} + d.SetNull() + return d +} + // PhysicalTable is used to provide some information for a physical table in TTL job type PhysicalTable struct { // ID is the physical ID of the table @@ -161,3 +195,276 @@ func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se session.Session, tm := rows[0].GetTime(0) return tm.CoreTime().GoTime(tz) } + +// SplitScanRanges split ranges for TTL scan +func (t *PhysicalTable) SplitScanRanges(ctx context.Context, store kv.Storage, splitCnt int) ([]ScanRange, error) { + if len(t.KeyColumns) != 1 || splitCnt <= 1 { + return []ScanRange{newFullRange()}, nil + } + + tikvStore, ok := store.(tikv.Storage) + if !ok { + return []ScanRange{newFullRange()}, nil + } + + ft := t.KeyColumns[0].FieldType + switch ft.GetType() { + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: + return t.splitIntRanges(ctx, tikvStore, splitCnt) + case mysql.TypeBit: + return t.splitBinaryRanges(ctx, tikvStore, splitCnt) + case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: + if mysql.HasBinaryFlag(ft.GetFlag()) { + return t.splitBinaryRanges(ctx, tikvStore, splitCnt) + } + } + return []ScanRange{newFullRange()}, nil +} + +func unsignedEdge(d types.Datum) types.Datum { + if d.IsNull() { + return types.NewUintDatum(uint64(math.MaxInt64 + 1)) + } + if d.GetInt64() == 0 { + return nullDatum() + } + return types.NewUintDatum(uint64(d.GetInt64())) +} + +func (t *PhysicalTable) splitIntRanges(ctx context.Context, store tikv.Storage, splitCnt int) ([]ScanRange, error) { + recordPrefix := tablecodec.GenTableRecordPrefix(t.ID) + startKey, endKey := tablecodec.GetTableHandleKeyRange(t.ID) + keyRanges, err := t.splitRawKeyRanges(ctx, store, startKey, endKey, splitCnt) + if err != nil { + return nil, err + } + + if len(keyRanges) <= 1 { + return []ScanRange{newFullRange()}, nil + } + + ft := t.KeyColumnTypes[0] + unsigned := mysql.HasUnsignedFlag(ft.GetFlag()) + scanRanges := make([]ScanRange, 0, len(keyRanges)+1) + curScanStart := nullDatum() + for i, keyRange := range keyRanges { + if i != 0 && curScanStart.IsNull() { + break + } + + curScanEnd := nullDatum() + if i < len(keyRanges)-1 { + if val := GetNextIntHandle(keyRange.EndKey, recordPrefix); val != nil { + curScanEnd = types.NewIntDatum(val.IntValue()) + } + } + + if !curScanStart.IsNull() && !curScanEnd.IsNull() && curScanStart.GetInt64() >= curScanEnd.GetInt64() { + continue + } + + if !unsigned { + // primary key is signed or range + scanRanges = append(scanRanges, newDatumRange(curScanStart, curScanEnd)) + } else if !curScanStart.IsNull() && curScanStart.GetInt64() >= 0 { + // primary key is unsigned and range is in the right half side + scanRanges = append(scanRanges, newDatumRange(unsignedEdge(curScanStart), unsignedEdge(curScanEnd))) + } else if !curScanEnd.IsNull() && curScanEnd.GetInt64() <= 0 { + // primary key is unsigned and range is in the left half side + scanRanges = append(scanRanges, newDatumRange(unsignedEdge(curScanStart), unsignedEdge(curScanEnd))) + } else { + // primary key is unsigned and the start > math.MaxInt64 && end < math.MaxInt64 + // we must split it to two ranges + scanRanges = append(scanRanges, + newDatumRange(unsignedEdge(curScanStart), nullDatum()), + newDatumRange(nullDatum(), unsignedEdge(curScanEnd)), + ) + } + curScanStart = curScanEnd + } + return scanRanges, nil +} + +func (t *PhysicalTable) splitBinaryRanges(ctx context.Context, store tikv.Storage, splitCnt int) ([]ScanRange, error) { + recordPrefix := tablecodec.GenTableRecordPrefix(t.ID) + startKey, endKey := recordPrefix, recordPrefix.PrefixNext() + keyRanges, err := t.splitRawKeyRanges(ctx, store, startKey, endKey, splitCnt) + if err != nil { + return nil, err + } + + if len(keyRanges) <= 1 { + return []ScanRange{newFullRange()}, nil + } + + scanRanges := make([]ScanRange, 0, len(keyRanges)) + curScanStart := nullDatum() + for i, keyRange := range keyRanges { + if i != 0 && curScanStart.IsNull() { + break + } + + curScanEnd := nullDatum() + if i != len(keyRanges)-1 { + curScanEnd = GetNextBytesHandleDatum(keyRange.EndKey, recordPrefix) + } + + if !curScanStart.IsNull() && !curScanEnd.IsNull() && kv.Key(curScanStart.GetBytes()).Cmp(curScanEnd.GetBytes()) >= 0 { + continue + } + + scanRanges = append(scanRanges, newDatumRange(curScanStart, curScanEnd)) + curScanStart = curScanEnd + } + return scanRanges, nil +} + +func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storage, startKey, endKey kv.Key, splitCnt int) ([]kv.KeyRange, error) { + regionCache := store.GetRegionCache() + regionIDs, err := regionCache.ListRegionIDsInKeyRange(tikv.NewBackofferWithVars(ctx, 20000, nil), startKey, endKey) + if err != nil { + return nil, err + } + + regionsPerRange := len(regionIDs) / splitCnt + oversizeCnt := len(regionIDs) % splitCnt + ranges := make([]kv.KeyRange, 0, mathutil.Min(len(regionIDs), splitCnt)) + for len(regionIDs) > 0 { + startRegion, err := regionCache.LocateRegionByID(tikv.NewBackofferWithVars(ctx, 20000, nil), regionIDs[0]) + if err != nil { + return nil, err + } + + endRegionIdx := regionsPerRange - 1 + if oversizeCnt > 0 { + endRegionIdx++ + } + + endRegion, err := regionCache.LocateRegionByID(tikv.NewBackofferWithVars(ctx, 20000, nil), regionIDs[endRegionIdx]) + if err != nil { + return nil, err + } + + rangeStartKey := kv.Key(startRegion.StartKey) + if rangeStartKey.Cmp(startKey) < 0 { + rangeStartKey = startKey + } + + rangeEndKey := kv.Key(endRegion.EndKey) + if rangeEndKey.Cmp(endKey) > 0 { + rangeEndKey = endKey + } + + ranges = append(ranges, kv.KeyRange{StartKey: rangeStartKey, EndKey: rangeEndKey}) + oversizeCnt-- + regionIDs = regionIDs[endRegionIdx+1:] + } + return ranges, nil +} + +var emptyBytesHandleKey kv.Key + +func init() { + key, err := codec.EncodeKey(nil, nil, types.NewBytesDatum(nil)) + terror.MustNil(err) + emptyBytesHandleKey = key +} + +// GetNextIntHandle is used for int handle tables. It returns the min handle whose encoded key is or after argument `key` +// If it cannot find a valid value, a null datum will be returned. +func GetNextIntHandle(key kv.Key, recordPrefix []byte) kv.Handle { + if key.Cmp(recordPrefix) > 0 && !key.HasPrefix(recordPrefix) { + return nil + } + + if key.Cmp(recordPrefix) <= 0 { + return kv.IntHandle(math.MinInt64) + } + + suffix := key[len(recordPrefix):] + encodedVal := suffix + if len(suffix) < 8 { + encodedVal = make([]byte, 8) + copy(encodedVal, suffix) + } + + findNext := false + if len(suffix) > 8 { + findNext = true + encodedVal = encodedVal[:8] + } + + u := codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(encodedVal)) + if !findNext { + return kv.IntHandle(u) + } + + if u == math.MaxInt64 { + return nil + } + + return kv.IntHandle(u + 1) +} + +// GetNextBytesHandleDatum is used for a table with one binary or string column common handle. +// It returns the minValue whose encoded key is or after argument `key` +// If it cannot find a valid value, a null datum will be returned. +func GetNextBytesHandleDatum(key kv.Key, recordPrefix []byte) (d types.Datum) { + if key.Cmp(recordPrefix) > 0 && !key.HasPrefix(recordPrefix) { + d.SetNull() + return d + } + + if key.Cmp(recordPrefix) <= 0 { + d.SetBytes([]byte{}) + return d + } + + encodedVal := key[len(recordPrefix):] + if encodedVal[0] < emptyBytesHandleKey[0] { + d.SetBytes([]byte{}) + return d + } + + if encodedVal[0] > emptyBytesHandleKey[0] { + d.SetNull() + return d + } + + if _, v, err := codec.DecodeOne(encodedVal); err == nil { + return v + } + + encodedVal = encodedVal[1:] + brokenGroupEndIdx := len(encodedVal) - 1 + brokenGroupEmptyBytes := len(encodedVal) % 9 + for i := 7; i+1 < len(encodedVal); i += 9 { + if emptyBytes := 255 - int(encodedVal[i+1]); emptyBytes != 0 || i+1 == len(encodedVal)-1 { + brokenGroupEndIdx = i + brokenGroupEmptyBytes = emptyBytes + break + } + } + + for i := 0; i < brokenGroupEmptyBytes; i++ { + if encodedVal[brokenGroupEndIdx] > 0 { + break + } + brokenGroupEndIdx-- + } + + if brokenGroupEndIdx < 0 { + d.SetBytes(nil) + return d + } + + val := make([]byte, 0, len(encodedVal)) + for i := 0; i <= brokenGroupEndIdx; i++ { + if i%9 == 8 { + continue + } + val = append(val, encodedVal[i]) + } + d.SetBytes(val) + return d +} diff --git a/ttl/sqlbuilder/sql.go b/ttl/sqlbuilder/sql.go index 4a27323da19a4..833327c7af404 100644 --- a/ttl/sqlbuilder/sql.go +++ b/ttl/sqlbuilder/sql.go @@ -40,8 +40,10 @@ func writeHex(in io.Writer, d types.Datum) error { } func writeDatum(restoreCtx *format.RestoreCtx, d types.Datum, ft *types.FieldType) error { - switch d.Kind() { - case types.KindString, types.KindBytes, types.KindBinaryLiteral: + switch ft.GetType() { + case mysql.TypeBit, mysql.TypeBlob, mysql.TypeLongBlob, mysql.TypeTinyBlob: + return writeHex(restoreCtx.In, d) + case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: if mysql.HasBinaryFlag(ft.GetFlag()) { return writeHex(restoreCtx.In, d) } @@ -311,6 +313,7 @@ type ScanQueryGenerator struct { keyRangeEnd []types.Datum stack [][]types.Datum limit int + firstBuild bool exhausted bool } @@ -333,6 +336,7 @@ func NewScanQueryGenerator(tbl *cache.PhysicalTable, expire time.Time, rangeStar expire: expire, keyRangeStart: rangeStart, keyRangeEnd: rangeEnd, + firstBuild: true, }, nil } @@ -346,6 +350,10 @@ func (g *ScanQueryGenerator) NextSQL(continueFromResult [][]types.Datum, nextLim return "", errors.Errorf("invalid limit '%d'", nextLimit) } + defer func() { + g.firstBuild = false + }() + if g.stack == nil { g.stack = make([][]types.Datum, 0, len(g.tbl.KeyColumns)) } @@ -416,7 +424,13 @@ func (g *ScanQueryGenerator) buildSQL() (string, error) { var err error if i < len(g.stack)-1 { err = b.WriteCommonCondition(col, "=", val) + } else if g.firstBuild { + // When `g.firstBuild == true`, that means we are querying rows after range start, because range is defined + // as [start, end), we should use ">=" to find the rows including start key. + err = b.WriteCommonCondition(col, ">=", val) } else { + // Otherwise when `g.firstBuild != true`, that means we are continuing with the previous result, we should use + // ">" to exclude the previous row. err = b.WriteCommonCondition(col, ">", val) } if err != nil { @@ -426,7 +440,7 @@ func (g *ScanQueryGenerator) buildSQL() (string, error) { } if len(g.keyRangeEnd) > 0 { - if err := b.WriteCommonCondition(g.tbl.KeyColumns, "<=", g.keyRangeEnd); err != nil { + if err := b.WriteCommonCondition(g.tbl.KeyColumns, "<", g.keyRangeEnd); err != nil { return "", err } } diff --git a/ttl/sqlbuilder/sql_test.go b/ttl/sqlbuilder/sql_test.go index dd6f58d9046ad..3bc982a092014 100644 --- a/ttl/sqlbuilder/sql_test.go +++ b/ttl/sqlbuilder/sql_test.go @@ -298,7 +298,7 @@ func TestFormatSQLDatum(t *testing.T) { tk.MustQuery("select id from t where " + colName + "=" + s).Check(testkit.Rows(rowID)) } if c.hex { - require.True(t, strings.HasPrefix(s, "x'"), s) + require.True(t, strings.HasPrefix(s, "x'"), "ft: %s, got: %s", c.ft, s) } } } @@ -499,7 +499,7 @@ func TestScanQueryGenerator(t *testing.T) { Name: model.NewCIStr("t1"), }, KeyColumns: []*model.ColumnInfo{ - {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, }, TimeColumn: &model.ColumnInfo{ Name: model.NewCIStr("time"), @@ -513,8 +513,8 @@ func TestScanQueryGenerator(t *testing.T) { Name: model.NewCIStr("t2"), }, KeyColumns: []*model.ColumnInfo{ - {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, - {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, {Name: model.NewCIStr("c"), FieldType: types.NewFieldTypeBuilder().SetType(mysql.TypeString).SetFlag(mysql.BinaryFlag).Build()}, }, TimeColumn: &model.ColumnInfo{ @@ -570,11 +570,11 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 3, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 1 AND `id` <= 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` >= 1 AND `id` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", }, { result(d(10), 3), 5, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 10 AND `id` <= 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 10 AND `id` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", }, { result(d(15), 4), 5, @@ -663,39 +663,39 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'0e' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` >= x'0e' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x1a}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x20}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "y", []byte{0x0a}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'y' AND `c` > x'0a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'y' AND `c` > x'0a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "y", []byte{0x11}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'y' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'y' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "z", []byte{0x02}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(3, "a", []byte{0x01}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` = 'a' AND `c` > x'01' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` = 'a' AND `c` > x'01' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(3, "a", []byte{0x11}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` > 'a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` > 'a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(3, "c", []byte{0x12}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 3 AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 3 AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(5, "e", []byte{0xa1}), 4), 5, "", @@ -735,7 +735,7 @@ func TestBuildDeleteSQL(t *testing.T) { Name: model.NewCIStr("t1"), }, KeyColumns: []*model.ColumnInfo{ - {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, }, TimeColumn: &model.ColumnInfo{ Name: model.NewCIStr("time"), @@ -749,8 +749,8 @@ func TestBuildDeleteSQL(t *testing.T) { Name: model.NewCIStr("t2"), }, KeyColumns: []*model.ColumnInfo{ - {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, - {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, }, TimeColumn: &model.ColumnInfo{ Name: model.NewCIStr("time"), diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index be77fdd317a87..6cb18a2a9e346 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -65,8 +65,7 @@ func (s *ttlStatistics) Reset() { type ttlScanTask struct { tbl *cache.PhysicalTable expire time.Time - rangeStart []types.Datum - rangeEnd []types.Datum + scanRange cache.ScanRange statistics *ttlStatistics } @@ -105,7 +104,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s }() sess := newTableSession(rawSess, t.tbl, t.expire) - generator, err := sqlbuilder.NewScanQueryGenerator(t.tbl, t.expire, t.rangeStart, t.rangeEnd) + generator, err := sqlbuilder.NewScanQueryGenerator(t.tbl, t.expire, t.scanRange.Start, t.scanRange.End) if err != nil { return t.result(err) } diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 1eeb7a306afca..96d103d061876 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -220,9 +220,11 @@ func newMockScanTask(t *testing.T, sqlCnt int) *mockScanTask { task := &mockScanTask{ t: t, ttlScanTask: &ttlScanTask{ - tbl: tbl, - expire: time.UnixMilli(0), - rangeStart: []types.Datum{types.NewIntDatum(0)}, + tbl: tbl, + expire: time.UnixMilli(0), + scanRange: cache.ScanRange{ + Start: []types.Datum{types.NewIntDatum(0)}, + }, statistics: &ttlStatistics{}, }, tbl: tbl, @@ -236,7 +238,11 @@ func newMockScanTask(t *testing.T, sqlCnt int) *mockScanTask { } func (t *mockScanTask) selectSQL(i int) string { - return fmt.Sprintf("SELECT LOW_PRIORITY `_tidb_rowid` FROM `test`.`t1` WHERE `_tidb_rowid` > %d AND `time` < '1970-01-01 08:00:00' ORDER BY `_tidb_rowid` ASC LIMIT 3", i*100) + op := ">" + if i == 0 { + op = ">=" + } + return fmt.Sprintf("SELECT LOW_PRIORITY `_tidb_rowid` FROM `test`.`t1` WHERE `_tidb_rowid` %s %d AND `time` < '1970-01-01 08:00:00' ORDER BY `_tidb_rowid` ASC LIMIT 3", op, i*100) } func (t *mockScanTask) runDoScanForTest(delTaskCnt int, errString string) *ttlScanTaskExecResult {