From 74f4c528005359532e23f406efbcbed218723a16 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 24 Nov 2022 14:35:30 +0800 Subject: [PATCH 1/6] ttl: Add some tools to build SQLs for TTL --- ttl/BUILD.bazel | 41 ++++ ttl/main_test.go | 33 +++ ttl/sql.go | 442 +++++++++++++++++++++++++++++++++ ttl/sql_test.go | 619 +++++++++++++++++++++++++++++++++++++++++++++++ ttl/table.go | 41 ++++ 5 files changed, 1176 insertions(+) create mode 100644 ttl/BUILD.bazel create mode 100644 ttl/main_test.go create mode 100644 ttl/sql.go create mode 100644 ttl/sql_test.go create mode 100644 ttl/table.go diff --git a/ttl/BUILD.bazel b/ttl/BUILD.bazel new file mode 100644 index 0000000000000..8e82a7053df67 --- /dev/null +++ b/ttl/BUILD.bazel @@ -0,0 +1,41 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "ttl", + srcs = [ + "sql.go", + "table.go", + ], + importpath = "github.com/pingcap/tidb/ttl", + visibility = ["//visibility:public"], + deps = [ + "//parser/ast", + "//parser/format", + "//parser/model", + "//parser/mysql", + "//types", + "//util/sqlexec", + "@com_github_pingcap_errors//:errors", + "@com_github_pkg_errors//:errors", + ], +) + +go_test( + name = "ttl_test", + srcs = [ + "main_test.go", + "sql_test.go", + ], + deps = [ + ":ttl", + "//kv", + "//parser/model", + "//parser/mysql", + "//testkit", + "//testkit/testsetup", + "//types", + "//util/sqlexec", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/ttl/main_test.go b/ttl/main_test.go new file mode 100644 index 0000000000000..8bda0eb98eeef --- /dev/null +++ b/ttl/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl_test + +import ( + "testing" + + "github.com/pingcap/tidb/testkit/testsetup" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testsetup.SetupForCommonTest() + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/ttl/sql.go b/ttl/sql.go new file mode 100644 index 0000000000000..0c7f6602ced31 --- /dev/null +++ b/ttl/sql.go @@ -0,0 +1,442 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl + +import ( + "encoding/hex" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/format" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/pkg/errors" +) + +func writeHex(in io.Writer, d types.Datum) error { + _, err := fmt.Fprintf(in, "x'%s'", hex.EncodeToString(d.GetBytes())) + return err +} + +func writeDatum(in io.Writer, d types.Datum, ft *types.FieldType) error { + switch d.Kind() { + case types.KindString, types.KindBytes, types.KindBinaryLiteral: + if mysql.HasBinaryFlag(ft.GetFlag()) { + return writeHex(in, d) + } + _, err := fmt.Fprintf(in, "'%s'", sqlexec.EscapeString(d.GetString())) + return err + } + ctx := format.NewRestoreCtx(format.DefaultRestoreFlags, in) + expr := ast.NewValueExpr(d.GetValue(), ft.GetCharset(), ft.GetCollate()) + return expr.Restore(ctx) +} + +// FormatSQLDatum formats the datum to a value string in sql +func FormatSQLDatum(d types.Datum, ft *types.FieldType) (string, error) { + var sb strings.Builder + if err := writeDatum(&sb, d, ft); err != nil { + return "", err + } + return sb.String(), nil +} + +type sqlBuilderState int + +const ( + writeBegin sqlBuilderState = iota + writeSelOrDel + writeWhere + writeOrderBy + writeLimit + writeDone +) + +// SQLBuilder is used to build SQLs for TTL +type SQLBuilder struct { + tbl *PhysicalTable + sb strings.Builder + state sqlBuilderState + + isReadOnly bool + hasWriteExpireCond bool +} + +// NewSQLBuilder creates a new TTLSQLBuilder +func NewSQLBuilder(tbl *PhysicalTable) *SQLBuilder { + return &SQLBuilder{tbl: tbl, state: writeBegin} +} + +// Build builds the final sql +func (b *SQLBuilder) Build() (string, error) { + if b.state == writeBegin { + return "", errors.Errorf("invalid state: %v", b.state) + } + + if !b.isReadOnly && !b.hasWriteExpireCond { + // check whether the `timeRow < expire_time` condition has been written to make sure this SQL is safe. + return "", errors.New("expire condition not write") + } + + if b.state != writeDone { + b.state = writeDone + } + + return b.sb.String(), nil +} + +// WriteSelect writes a select statement to select key columns without any condition +func (b *SQLBuilder) WriteSelect() error { + if b.state != writeBegin { + return errors.Errorf("invalid state: %v", b.state) + } + b.sb.WriteString("SELECT LOW_PRIORITY ") + b.writeColNames(b.tbl.KeyColumns, false) + b.sb.WriteString(" FROM ") + b.writeTblName() + if par := b.tbl.PartitionDef; par != nil { + b.sb.WriteString(" PARTITION(`") + b.sb.WriteString(par.Name.O) + b.sb.WriteString("`)") + } + b.state = writeSelOrDel + b.isReadOnly = true + return nil +} + +// WriteDelete writes a delete statement without any condition +func (b *SQLBuilder) WriteDelete() error { + if b.state != writeBegin { + return errors.Errorf("invalid state: %v", b.state) + } + b.sb.WriteString("DELETE LOW_PRIORITY FROM ") + b.writeTblName() + if par := b.tbl.PartitionDef; par != nil { + b.sb.WriteString(" PARTITION(`") + b.sb.WriteString(par.Name.O) + b.sb.WriteString("`)") + } + b.state = writeSelOrDel + return nil +} + +// WriteCommonCondition writes a new condition +func (b *SQLBuilder) WriteCommonCondition(cols []*model.ColumnInfo, op string, dp []types.Datum) error { + switch b.state { + case writeSelOrDel: + b.sb.WriteString(" WHERE ") + b.state = writeWhere + case writeWhere: + b.sb.WriteString(" AND ") + default: + return errors.Errorf("invalid state: %v", b.state) + } + + b.writeColNames(cols, len(cols) > 1) + b.sb.WriteRune(' ') + b.sb.WriteString(op) + b.sb.WriteRune(' ') + return b.writeDataPoint(cols, dp) +} + +// WriteExpireCondition writes a condition with the time column +func (b *SQLBuilder) WriteExpireCondition(expire time.Time) error { + switch b.state { + case writeSelOrDel: + b.sb.WriteString(" WHERE ") + b.state = writeWhere + case writeWhere: + b.sb.WriteString(" AND ") + default: + return errors.Errorf("invalid state: %v", b.state) + } + + b.writeColNames([]*model.ColumnInfo{b.tbl.TimeColumn}, false) + b.sb.WriteString(" < ") + b.sb.WriteString("'") + b.sb.WriteString(expire.Format("2006-01-02 15:04:05.999999")) + b.sb.WriteString("'") + b.hasWriteExpireCond = true + return nil +} + +// WriteInCondition writes an IN condition +func (b *SQLBuilder) WriteInCondition(cols []*model.ColumnInfo, dps ...[]types.Datum) error { + switch b.state { + case writeSelOrDel: + b.sb.WriteString(" WHERE ") + b.state = writeWhere + case writeWhere: + b.sb.WriteString(" AND ") + default: + return errors.Errorf("invalid state: %v", b.state) + } + + b.writeColNames(cols, len(cols) > 1) + b.sb.WriteString(" IN ") + b.sb.WriteRune('(') + first := true + for _, v := range dps { + if first { + first = false + } else { + b.sb.WriteString(", ") + } + if err := b.writeDataPoint(cols, v); err != nil { + return err + } + } + b.sb.WriteRune(')') + return nil +} + +// WriteOrderBy writes order by +func (b *SQLBuilder) WriteOrderBy(cols []*model.ColumnInfo, desc bool) error { + if b.state != writeSelOrDel && b.state != writeWhere { + return errors.Errorf("invalid state: %v", b.state) + } + b.state = writeOrderBy + b.sb.WriteString(" ORDER BY ") + b.writeColNames(cols, false) + if desc { + b.sb.WriteString(" DESC") + } else { + b.sb.WriteString(" ASC") + } + return nil +} + +// WriteLimit writes the limit +func (b *SQLBuilder) WriteLimit(n int) error { + if b.state != writeSelOrDel && b.state != writeWhere && b.state != writeOrderBy { + return errors.Errorf("invalid state: %v", b.state) + } + b.state = writeLimit + b.sb.WriteString(" LIMIT ") + b.sb.WriteString(strconv.Itoa(n)) + return nil +} + +func (b *SQLBuilder) writeTblName() { + b.sb.WriteRune('`') + b.sb.WriteString(b.tbl.Schema.O) + b.sb.WriteString("`.`") + b.sb.WriteString(b.tbl.Name.O) + b.sb.WriteRune('`') +} + +func (b *SQLBuilder) writeColName(col *model.ColumnInfo) { + b.sb.WriteRune('`') + b.sb.WriteString(col.Name.O) + b.sb.WriteRune('`') +} + +func (b *SQLBuilder) writeColNames(cols []*model.ColumnInfo, writeBrackets bool) { + if writeBrackets { + b.sb.WriteRune('(') + } + + first := true + for _, col := range cols { + if first { + first = false + } else { + b.sb.WriteString(", ") + } + b.writeColName(col) + } + + if writeBrackets { + b.sb.WriteRune(')') + } +} + +func (b *SQLBuilder) writeDataPoint(cols []*model.ColumnInfo, dp []types.Datum) error { + writeBrackets := len(cols) > 1 + if len(cols) != len(dp) { + return errors.Errorf("col count not match %d != %d", len(cols), len(dp)) + } + + if writeBrackets { + b.sb.WriteRune('(') + } + + first := true + for i, d := range dp { + if first { + first = false + } else { + b.sb.WriteString(", ") + } + if err := writeDatum(&b.sb, d, &cols[i].FieldType); err != nil { + return err + } + } + + if writeBrackets { + b.sb.WriteRune(')') + } + + return nil +} + +// ScanQueryGenerator generates SQLs for scan task +type ScanQueryGenerator struct { + tbl *PhysicalTable + expire time.Time + keyRangeStart []types.Datum + keyRangeEnd []types.Datum + stack [][]types.Datum + limit int + exhausted bool +} + +// NewScanQueryGenerator creates a new ScanQueryGenerator +func NewScanQueryGenerator(tbl *PhysicalTable, expire time.Time, rangeStart []types.Datum, rangeEnd []types.Datum) (*ScanQueryGenerator, error) { + if len(rangeStart) > 0 { + if err := tbl.ValidateKey(rangeStart); err != nil { + return nil, err + } + } + + if len(rangeEnd) > 0 { + if err := tbl.ValidateKey(rangeEnd); err != nil { + return nil, err + } + } + + return &ScanQueryGenerator{ + tbl: tbl, + expire: expire, + keyRangeStart: rangeStart, + keyRangeEnd: rangeEnd, + }, nil +} + +// NextSQL creates next sql of the scan task +func (g *ScanQueryGenerator) NextSQL(continueFromResult [][]types.Datum, nextLimit int) (string, error) { + if g.exhausted { + return "", errors.New("generator is exhausted") + } + + if nextLimit <= 0 { + return "", errors.Errorf("invalid limit '%d'", nextLimit) + } + + if g.stack == nil { + g.stack = make([][]types.Datum, 0, len(g.tbl.KeyColumns)) + } + + if len(continueFromResult) >= g.limit { + var continueFromKey []types.Datum + if cnt := len(continueFromResult); cnt > 0 { + continueFromKey = continueFromResult[cnt-1] + } + if err := g.setStack(continueFromKey); err != nil { + return "", err + } + } else { + if l := len(g.stack); l > 0 { + g.stack = g.stack[:l-1] + } + if len(g.stack) == 0 { + g.exhausted = true + } + } + g.limit = nextLimit + return g.buildSQL() +} + +// IsExhausted returns whether the generator is exhausted +func (g *ScanQueryGenerator) IsExhausted() bool { + return g.exhausted +} + +func (g *ScanQueryGenerator) setStack(key []types.Datum) error { + if key == nil { + key = g.keyRangeStart + } + + if key == nil { + g.stack = g.stack[:0] + return nil + } + + if err := g.tbl.ValidateKey(key); err != nil { + return err + } + + g.stack = g.stack[:cap(g.stack)] + for i := 0; i < len(key); i++ { + g.stack[i] = key[0 : i+1] + } + return nil +} + +func (g *ScanQueryGenerator) buildSQL() (string, error) { + if g.limit <= 0 { + return "", errors.Errorf("invalid limit '%d'", g.limit) + } + + if g.exhausted { + return "", nil + } + + b := NewSQLBuilder(g.tbl) + if err := b.WriteSelect(); err != nil { + return "", err + } + if len(g.stack) > 0 { + for i, d := range g.stack[len(g.stack)-1] { + col := []*model.ColumnInfo{g.tbl.KeyColumns[i]} + val := []types.Datum{d} + var err error + if i < len(g.stack)-1 { + err = b.WriteCommonCondition(col, "=", val) + } else { + err = b.WriteCommonCondition(col, ">", val) + } + if err != nil { + return "", err + } + } + } + + if len(g.keyRangeEnd) > 0 { + if err := b.WriteCommonCondition(g.tbl.KeyColumns, "<=", g.keyRangeEnd); err != nil { + return "", err + } + } + + if err := b.WriteExpireCondition(g.expire); err != nil { + return "", err + } + + if err := b.WriteOrderBy(g.tbl.KeyColumns, false); err != nil { + return "", err + } + + if err := b.WriteLimit(g.limit); err != nil { + return "", err + } + + return b.Build() +} diff --git a/ttl/sql_test.go b/ttl/sql_test.go new file mode 100644 index 0000000000000..fe92920edda8d --- /dev/null +++ b/ttl/sql_test.go @@ -0,0 +1,619 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/stretchr/testify/require" +) + +func TestFormatSQLDatum(t *testing.T) { + cases := []struct { + ft string + values []interface{} + hex bool + notSupport bool + }{ + { + ft: "int", + values: []interface{}{1, 2, 3, -12}, + }, + { + ft: "decimal(5, 2)", + values: []interface{}{"0.3", "128.71", "-245.32"}, + }, + { + ft: "varchar(32) CHARACTER SET latin1", + values: []interface{}{ + "aa';delete from t where 1;", + string([]byte{0xf1, 0xf2}), + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + }, + }, + { + ft: "char(32) CHARACTER SET utf8mb4", + values: []interface{}{ + "demo", + "\n123", + "aa';delete from t where 1;", + "你好👋", + }, + }, + { + ft: "varchar(32) CHARACTER SET utf8mb4", + values: []interface{}{ + "demo", + "aa';delete from t where 1;", + "你好👋", + }, + }, + { + ft: "varchar(32) CHARACTER SET binary", + values: []interface{}{ + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + "你好👋", + "abcdef", + }, + hex: true, + }, + { + ft: "binary(8)", + values: []interface{}{ + string([]byte{0xf1, 0xf2}), + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + }, + hex: true, + }, + { + ft: "blob", + values: []interface{}{ + string([]byte{0xf1, 0xf2}), + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + }, + hex: true, + }, + { + ft: "bit(1)", + values: []interface{}{0, 1}, + hex: true, + }, + { + ft: "date", + values: []interface{}{"2022-01-02", "1900-12-31"}, + }, + { + ft: "time", + values: []interface{}{"00:00", "01:23", "13:51:22"}, + }, + { + ft: "datetime", + values: []interface{}{"2022-01-02 12:11:11", "2022-01-02"}, + }, + { + ft: "timestamp", + values: []interface{}{"2022-01-02 12:11:11", "2022-01-02"}, + }, + { + ft: "json", + values: []interface{}{"{}"}, + notSupport: true, + }, + } + + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + // create a table with n columns + var sb strings.Builder + sb.WriteString("CREATE TABLE t (id varchar(32) primary key") + for i, c := range cases { + _, err := fmt.Fprintf(&sb, ",\n col%d %s DEFAULT NULL", i, c.ft) + require.NoError(t, err) + } + sb.WriteString("\n);") + tk.MustExec(sb.String()) + + tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + for i, c := range cases { + for j, v := range c.values { + tk.MustExec(fmt.Sprintf("insert into t (id, col%d) values ('%d-%d', ?)", i, i, j), v) + } + } + + ctx := kv.WithInternalSourceType(context.TODO(), kv.InternalTxnOthers) + for i, c := range cases { + for j := range c.values { + rowID := fmt.Sprintf("%d-%d", i, j) + colName := fmt.Sprintf("col%d", i) + exec, ok := tk.Session().(sqlexec.SQLExecutor) + require.True(t, ok) + selectSQL := fmt.Sprintf("select %s from t where id='%s'", colName, rowID) + rs, err := exec.ExecuteInternal(ctx, selectSQL) + require.NoError(t, err, selectSQL) + rows, err := sqlexec.DrainRecordSet(ctx, rs, 1) + require.NoError(t, err, selectSQL) + require.Equal(t, 1, len(rows), selectSQL) + col := tbl.Meta().FindPublicColumnByName(colName) + d := rows[0].GetDatum(0, &col.FieldType) + s, err := ttl.FormatSQLDatum(d, &col.FieldType) + if c.notSupport { + require.Error(t, err) + } else { + require.NoError(t, err) + //fmt.Printf("%s: %s\n", c.ft, s) + tk.MustQuery("select id from t where " + colName + "=" + s).Check(testkit.Rows(rowID)) + } + if c.hex { + require.True(t, strings.HasPrefix(s, "x'"), s) + } + } + } +} + +func TestSQLBuilder(t *testing.T) { + must := func(err error) { + require.NoError(t, err) + } + + mustBuild := func(b *ttl.SQLBuilder, str string) { + s, err := b.Build() + require.NoError(t, err) + require.Equal(t, str, s) + } + + var b *ttl.SQLBuilder + + t1 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t1"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + t2 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test2"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t2"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + tp := &ttl.PhysicalTable{ + Schema: model.NewCIStr("testp"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("tp"), + }, + KeyColumns: t1.KeyColumns, + TimeColumn: t1.TimeColumn, + PartitionDef: &model.PartitionDefinition{ + Name: model.NewCIStr("p1"), + }, + } + + // test build select queries + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1`") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1"))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1"))) + must(b.WriteCommonCondition(t1.KeyColumns, "<=", d("c3"))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1' AND `id` <= 'c3'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + shLoc, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + must(b.WriteExpireCondition(time.UnixMilli(0).In(shLoc))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 08:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1"))) + must(b.WriteCommonCondition(t1.KeyColumns, "<=", d("c3"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1' AND `id` <= 'c3' AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteOrderBy(t1.KeyColumns, false)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` ORDER BY `id` ASC") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteOrderBy(t1.KeyColumns, true)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` ORDER BY `id` DESC") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteOrderBy(t1.KeyColumns, false)) + must(b.WriteLimit(128)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` ORDER BY `id` ASC LIMIT 128") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("';``~?%\"\n"))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > '\\';``~?%\\\"\\n'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1';'"))) + must(b.WriteCommonCondition(t1.KeyColumns, "<=", d("a2\""))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteOrderBy(t1.KeyColumns, false)) + must(b.WriteLimit(128)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1\\';\\'' AND `id` <= 'a2\\\"' AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 128") + + b = ttl.NewSQLBuilder(t2) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t2.KeyColumns, ">", d("x1", 20))) + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE (`a`, `b`) > ('x1', 20)") + + b = ttl.NewSQLBuilder(t2) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t2.KeyColumns, "<=", d("x2", 21))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteOrderBy(t2.KeyColumns, false)) + must(b.WriteLimit(100)) + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE (`a`, `b`) <= ('x2', 21) AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b` ASC LIMIT 100") + + b = ttl.NewSQLBuilder(t2) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t2.KeyColumns[0:1], "=", d("x3"))) + must(b.WriteCommonCondition(t2.KeyColumns[1:2], ">", d(31))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteOrderBy(t2.KeyColumns, false)) + must(b.WriteLimit(100)) + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE `a` = 'x3' AND `b` > 31 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b` ASC LIMIT 100") + + // test build delete queries + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + _, err = b.Build() + require.EqualError(t, err, "expire condition not write!") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t1.KeyColumns, d("a"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a') AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t1.KeyColumns, d("a"), d("b"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a', 'b') AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t2.KeyColumns, d("a", 1))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteLimit(100)) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1)) AND `time` < '1970-01-01 00:00:00' LIMIT 100") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t2.KeyColumns, d("a", 1), d("b", 2))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteLimit(100)) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < '1970-01-01 00:00:00' LIMIT 100") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t2.KeyColumns, d("a", 1), d("b", 2))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < '1970-01-01 00:00:00'") + + // test select partition table + b = ttl.NewSQLBuilder(tp) + must(b.WriteSelect()) + must(b.WriteCommonCondition(tp.KeyColumns, ">", d("a1"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` > 'a1' AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(tp) + must(b.WriteDelete()) + must(b.WriteInCondition(tp.KeyColumns, d("a"), d("b"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` IN ('a', 'b') AND `time` < '1970-01-01 00:00:00'") +} + +func TestScanQueryGenerator(t *testing.T) { + t1 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t1"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + t2 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test2"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t2"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + {Name: model.NewCIStr("c"), FieldType: types.NewFieldTypeBuilder().SetType(mysql.TypeString).SetFlag(mysql.BinaryFlag).Build()}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + result := func(last []types.Datum, n int) [][]types.Datum { + ds := make([][]types.Datum, n) + ds[n-1] = last + return ds + } + + cases := []struct { + tbl *ttl.PhysicalTable + expire time.Time + rangeStart []types.Datum + rangeEnd []types.Datum + path [][]interface{} + }{ + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + nil, 5, "", + }, + }, + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + [][]types.Datum{}, 5, "", + }, + }, + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + rangeStart: d(1), + rangeEnd: d(100), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 1 AND `id` <= 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + result(d(10), 3), 5, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 10 AND `id` <= 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", + }, + { + result(d(15), 4), 5, + "", + }, + }, + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + result(d(2), 3), 5, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 2 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", + }, + { + result(d(4), 5), 6, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 4 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 6", + }, + { + result(d(7), 5), 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + nil, 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + nil, 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + [][]types.Datum{}, 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "x", []byte{0xf0}), 4), 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + rangeStart: d(1, "x", []byte{0xe}), + rangeEnd: d(100, "z", []byte{0xff}), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'0e' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "x", []byte{0x1a}), 5), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "x", []byte{0x20}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "y", []byte{0x0a}), 5), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'y' AND `c` > x'0a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "y", []byte{0x11}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'y' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "z", []byte{0x02}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(3, "a", []byte{0x01}), 5), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` = 'a' AND `c` > x'01' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(3, "a", []byte{0x11}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` > 'a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(3, "c", []byte{0x12}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 3 AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(5, "e", []byte{0xa1}), 4), 5, "", + }, + }, + }, + } + + for i, c := range cases { + g, err := ttl.NewScanQueryGenerator(c.tbl, c.expire, c.rangeStart, c.rangeEnd) + require.NoError(t, err, fmt.Sprintf("%d", i)) + for j, p := range c.path { + msg := fmt.Sprintf("%d-%d", i, j) + var result [][]types.Datum + require.Equal(t, 3, len(p), msg) + if arg := p[0]; arg != nil { + r, ok := arg.([][]types.Datum) + require.True(t, ok, msg) + result = r + } + limit, ok := p[1].(int) + require.True(t, ok, msg) + sql, ok := p[2].(string) + require.True(t, ok, msg) + s, err := g.NextSQL(result, limit) + require.NoError(t, err, msg) + require.Equal(t, sql, s, msg) + require.Equal(t, s == "", g.IsExhausted(), msg) + } + } +} + +func d(vs ...interface{}) []types.Datum { + datums := make([]types.Datum, len(vs)) + for i, v := range vs { + switch val := v.(type) { + case string: + datums[i] = types.NewStringDatum(val) + case int: + datums[i] = types.NewIntDatum(int64(val)) + case []byte: + datums[i] = types.NewBytesDatum(val) + default: + panic(fmt.Sprintf("invalid value type: %T, value: %v", v, v)) + } + } + return datums +} diff --git a/ttl/table.go b/ttl/table.go new file mode 100644 index 0000000000000..b9c59a34e5c17 --- /dev/null +++ b/ttl/table.go @@ -0,0 +1,41 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/types" +) + +// PhysicalTable is used to provide some information for a physical table in TTL job +type PhysicalTable struct { + Schema model.CIStr + *model.TableInfo + // PartitionDef is the partition definition + PartitionDef *model.PartitionDefinition + // KeyColumns is the cluster index key columns for the table + KeyColumns []*model.ColumnInfo + // TimeColum is the time column used for TTL + TimeColumn *model.ColumnInfo +} + +// ValidateKey validates a key +func (t *PhysicalTable) ValidateKey(key []types.Datum) error { + if len(t.KeyColumns) != len(key) { + return errors.Errorf("invalid key length: %d, expected %d", len(key), len(t.KeyColumns)) + } + return nil +} From 3515c1951c05fb34e36653dcdd57f73a05bf7812 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 24 Nov 2022 15:36:52 +0800 Subject: [PATCH 2/6] update --- ttl/sql_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ttl/sql_test.go b/ttl/sql_test.go index fe92920edda8d..c74585c7effcc 100644 --- a/ttl/sql_test.go +++ b/ttl/sql_test.go @@ -317,7 +317,7 @@ func TestSQLBuilder(t *testing.T) { b = ttl.NewSQLBuilder(t1) must(b.WriteDelete()) _, err = b.Build() - require.EqualError(t, err, "expire condition not write!") + require.EqualError(t, err, "expire condition not write") b = ttl.NewSQLBuilder(t1) must(b.WriteDelete()) From 8ac41de59f791fe2b0ef9f24d7b5272dac03e51b Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 24 Nov 2022 16:45:28 +0800 Subject: [PATCH 3/6] Add `BuildDeleteSQL` --- ttl/sql.go | 26 +++++++++++++++++++ ttl/sql_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/ttl/sql.go b/ttl/sql.go index 0c7f6602ced31..940a090cf1d78 100644 --- a/ttl/sql.go +++ b/ttl/sql.go @@ -440,3 +440,29 @@ func (g *ScanQueryGenerator) buildSQL() (string, error) { return b.Build() } + +// BuildDeleteSQL builds a delete SQL +func BuildDeleteSQL(tbl *PhysicalTable, rows [][]types.Datum, expire time.Time) (string, error) { + if len(rows) == 0 { + return "", errors.New("Cannot build delete SQL with empty rows") + } + + b := NewSQLBuilder(tbl) + if err := b.WriteDelete(); err != nil { + return "", err + } + + if err := b.WriteInCondition(tbl.KeyColumns, rows...); err != nil { + return "", err + } + + if err := b.WriteExpireCondition(expire); err != nil { + return "", err + } + + if err := b.WriteLimit(len(rows)); err != nil { + return "", err + } + + return b.Build() +} diff --git a/ttl/sql_test.go b/ttl/sql_test.go index c74585c7effcc..7efcb3a3790a2 100644 --- a/ttl/sql_test.go +++ b/ttl/sql_test.go @@ -601,6 +601,75 @@ func TestScanQueryGenerator(t *testing.T) { } } +func TestBuildDeleteSQL(t *testing.T) { + t1 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t1"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + t2 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test2"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t2"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + cases := []struct { + tbl *ttl.PhysicalTable + expire time.Time + rows [][]types.Datum + sql string + }{ + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(1)}, + sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (1) AND `time` < '1970-01-01 00:00:00' LIMIT 1", + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(2), d(3), d(4)}, + sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (2, 3, 4) AND `time` < '1970-01-01 00:00:00' LIMIT 3", + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(1, "a")}, + sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a')) AND `time` < '1970-01-01 00:00:00' LIMIT 1", + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(1, "a"), d(2, "b")}, + sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a'), (2, 'b')) AND `time` < '1970-01-01 00:00:00' LIMIT 2", + }, + } + + for _, c := range cases { + sql, err := ttl.BuildDeleteSQL(c.tbl, c.rows, c.expire) + require.NoError(t, err) + require.Equal(t, c.sql, sql) + } +} + func d(vs ...interface{}) []types.Datum { datums := make([]types.Datum, len(vs)) for i, v := range vs { From 452e86428ef844430264d13aafae9deb673892d0 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Fri, 25 Nov 2022 12:03:31 +0800 Subject: [PATCH 4/6] update --- ttl/sql.go | 120 ++++++++++++++++++++++----------------------- ttl/sql_test.go | 126 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+), 59 deletions(-) diff --git a/ttl/sql.go b/ttl/sql.go index 940a090cf1d78..9cdf762846d4d 100644 --- a/ttl/sql.go +++ b/ttl/sql.go @@ -36,24 +36,24 @@ func writeHex(in io.Writer, d types.Datum) error { return err } -func writeDatum(in io.Writer, d types.Datum, ft *types.FieldType) error { +func writeDatum(restoreCtx *format.RestoreCtx, d types.Datum, ft *types.FieldType) error { switch d.Kind() { case types.KindString, types.KindBytes, types.KindBinaryLiteral: if mysql.HasBinaryFlag(ft.GetFlag()) { - return writeHex(in, d) + return writeHex(restoreCtx.In, d) } - _, err := fmt.Fprintf(in, "'%s'", sqlexec.EscapeString(d.GetString())) + _, err := fmt.Fprintf(restoreCtx.In, "'%s'", sqlexec.EscapeString(d.GetString())) return err } - ctx := format.NewRestoreCtx(format.DefaultRestoreFlags, in) expr := ast.NewValueExpr(d.GetValue(), ft.GetCharset(), ft.GetCollate()) - return expr.Restore(ctx) + return expr.Restore(restoreCtx) } // FormatSQLDatum formats the datum to a value string in sql func FormatSQLDatum(d types.Datum, ft *types.FieldType) (string, error) { var sb strings.Builder - if err := writeDatum(&sb, d, ft); err != nil { + ctx := format.NewRestoreCtx(format.DefaultRestoreFlags, &sb) + if err := writeDatum(ctx, d, ft); err != nil { return "", err } return sb.String(), nil @@ -72,9 +72,10 @@ const ( // SQLBuilder is used to build SQLs for TTL type SQLBuilder struct { - tbl *PhysicalTable - sb strings.Builder - state sqlBuilderState + tbl *PhysicalTable + sb strings.Builder + restoreCtx *format.RestoreCtx + state sqlBuilderState isReadOnly bool hasWriteExpireCond bool @@ -82,7 +83,9 @@ type SQLBuilder struct { // NewSQLBuilder creates a new TTLSQLBuilder func NewSQLBuilder(tbl *PhysicalTable) *SQLBuilder { - return &SQLBuilder{tbl: tbl, state: writeBegin} + b := &SQLBuilder{tbl: tbl, state: writeBegin} + b.restoreCtx = format.NewRestoreCtx(format.DefaultRestoreFlags, &b.sb) + return b } // Build builds the final sql @@ -108,14 +111,16 @@ func (b *SQLBuilder) WriteSelect() error { if b.state != writeBegin { return errors.Errorf("invalid state: %v", b.state) } - b.sb.WriteString("SELECT LOW_PRIORITY ") + b.restoreCtx.WritePlain("SELECT LOW_PRIORITY ") b.writeColNames(b.tbl.KeyColumns, false) - b.sb.WriteString(" FROM ") - b.writeTblName() + b.restoreCtx.WritePlain(" FROM ") + if err := b.writeTblName(); err != nil { + return err + } if par := b.tbl.PartitionDef; par != nil { - b.sb.WriteString(" PARTITION(`") - b.sb.WriteString(par.Name.O) - b.sb.WriteString("`)") + b.restoreCtx.WritePlain(" PARTITION(") + b.restoreCtx.WriteName(par.Name.O) + b.restoreCtx.WritePlain(")") } b.state = writeSelOrDel b.isReadOnly = true @@ -127,12 +132,14 @@ func (b *SQLBuilder) WriteDelete() error { if b.state != writeBegin { return errors.Errorf("invalid state: %v", b.state) } - b.sb.WriteString("DELETE LOW_PRIORITY FROM ") - b.writeTblName() + b.restoreCtx.WritePlain("DELETE LOW_PRIORITY FROM ") + if err := b.writeTblName(); err != nil { + return err + } if par := b.tbl.PartitionDef; par != nil { - b.sb.WriteString(" PARTITION(`") - b.sb.WriteString(par.Name.O) - b.sb.WriteString("`)") + b.restoreCtx.WritePlain(" PARTITION(") + b.restoreCtx.WriteName(par.Name.O) + b.restoreCtx.WritePlain(")") } b.state = writeSelOrDel return nil @@ -142,18 +149,18 @@ func (b *SQLBuilder) WriteDelete() error { func (b *SQLBuilder) WriteCommonCondition(cols []*model.ColumnInfo, op string, dp []types.Datum) error { switch b.state { case writeSelOrDel: - b.sb.WriteString(" WHERE ") + b.restoreCtx.WritePlain(" WHERE ") b.state = writeWhere case writeWhere: - b.sb.WriteString(" AND ") + b.restoreCtx.WritePlain(" AND ") default: return errors.Errorf("invalid state: %v", b.state) } b.writeColNames(cols, len(cols) > 1) - b.sb.WriteRune(' ') - b.sb.WriteString(op) - b.sb.WriteRune(' ') + b.restoreCtx.WritePlain(" ") + b.restoreCtx.WritePlain(op) + b.restoreCtx.WritePlain(" ") return b.writeDataPoint(cols, dp) } @@ -161,19 +168,19 @@ func (b *SQLBuilder) WriteCommonCondition(cols []*model.ColumnInfo, op string, d func (b *SQLBuilder) WriteExpireCondition(expire time.Time) error { switch b.state { case writeSelOrDel: - b.sb.WriteString(" WHERE ") + b.restoreCtx.WritePlain(" WHERE ") b.state = writeWhere case writeWhere: - b.sb.WriteString(" AND ") + b.restoreCtx.WritePlain(" AND ") default: return errors.Errorf("invalid state: %v", b.state) } b.writeColNames([]*model.ColumnInfo{b.tbl.TimeColumn}, false) - b.sb.WriteString(" < ") - b.sb.WriteString("'") - b.sb.WriteString(expire.Format("2006-01-02 15:04:05.999999")) - b.sb.WriteString("'") + b.restoreCtx.WritePlain(" < ") + b.restoreCtx.WritePlain("'") + b.restoreCtx.WritePlain(expire.Format("2006-01-02 15:04:05.999999")) + b.restoreCtx.WritePlain("'") b.hasWriteExpireCond = true return nil } @@ -182,29 +189,29 @@ func (b *SQLBuilder) WriteExpireCondition(expire time.Time) error { func (b *SQLBuilder) WriteInCondition(cols []*model.ColumnInfo, dps ...[]types.Datum) error { switch b.state { case writeSelOrDel: - b.sb.WriteString(" WHERE ") + b.restoreCtx.WritePlain(" WHERE ") b.state = writeWhere case writeWhere: - b.sb.WriteString(" AND ") + b.restoreCtx.WritePlain(" AND ") default: return errors.Errorf("invalid state: %v", b.state) } b.writeColNames(cols, len(cols) > 1) - b.sb.WriteString(" IN ") - b.sb.WriteRune('(') + b.restoreCtx.WritePlain(" IN ") + b.restoreCtx.WritePlain("(") first := true for _, v := range dps { if first { first = false } else { - b.sb.WriteString(", ") + b.restoreCtx.WritePlain(", ") } if err := b.writeDataPoint(cols, v); err != nil { return err } } - b.sb.WriteRune(')') + b.restoreCtx.WritePlain(")") return nil } @@ -214,12 +221,12 @@ func (b *SQLBuilder) WriteOrderBy(cols []*model.ColumnInfo, desc bool) error { return errors.Errorf("invalid state: %v", b.state) } b.state = writeOrderBy - b.sb.WriteString(" ORDER BY ") + b.restoreCtx.WritePlain(" ORDER BY ") b.writeColNames(cols, false) if desc { - b.sb.WriteString(" DESC") + b.restoreCtx.WritePlain(" DESC") } else { - b.sb.WriteString(" ASC") + b.restoreCtx.WritePlain(" ASC") } return nil } @@ -230,28 +237,23 @@ func (b *SQLBuilder) WriteLimit(n int) error { return errors.Errorf("invalid state: %v", b.state) } b.state = writeLimit - b.sb.WriteString(" LIMIT ") - b.sb.WriteString(strconv.Itoa(n)) + b.restoreCtx.WritePlain(" LIMIT ") + b.restoreCtx.WritePlain(strconv.Itoa(n)) return nil } -func (b *SQLBuilder) writeTblName() { - b.sb.WriteRune('`') - b.sb.WriteString(b.tbl.Schema.O) - b.sb.WriteString("`.`") - b.sb.WriteString(b.tbl.Name.O) - b.sb.WriteRune('`') +func (b *SQLBuilder) writeTblName() error { + tn := ast.TableName{Schema: b.tbl.Schema, Name: b.tbl.Name} + return tn.Restore(b.restoreCtx) } func (b *SQLBuilder) writeColName(col *model.ColumnInfo) { - b.sb.WriteRune('`') - b.sb.WriteString(col.Name.O) - b.sb.WriteRune('`') + b.restoreCtx.WriteName(col.Name.O) } func (b *SQLBuilder) writeColNames(cols []*model.ColumnInfo, writeBrackets bool) { if writeBrackets { - b.sb.WriteRune('(') + b.restoreCtx.WritePlain("(") } first := true @@ -259,13 +261,13 @@ func (b *SQLBuilder) writeColNames(cols []*model.ColumnInfo, writeBrackets bool) if first { first = false } else { - b.sb.WriteString(", ") + b.restoreCtx.WritePlain(", ") } b.writeColName(col) } if writeBrackets { - b.sb.WriteRune(')') + b.restoreCtx.WritePlain(")") } } @@ -276,7 +278,7 @@ func (b *SQLBuilder) writeDataPoint(cols []*model.ColumnInfo, dp []types.Datum) } if writeBrackets { - b.sb.WriteRune('(') + b.restoreCtx.WritePlain("(") } first := true @@ -284,15 +286,15 @@ func (b *SQLBuilder) writeDataPoint(cols []*model.ColumnInfo, dp []types.Datum) if first { first = false } else { - b.sb.WriteString(", ") + b.restoreCtx.WritePlain(", ") } - if err := writeDatum(&b.sb, d, &cols[i].FieldType); err != nil { + if err := writeDatum(b.restoreCtx, d, &cols[i].FieldType); err != nil { return err } } if writeBrackets { - b.sb.WriteRune(')') + b.restoreCtx.WritePlain(")") } return nil diff --git a/ttl/sql_test.go b/ttl/sql_test.go index 7efcb3a3790a2..f013d557402b5 100644 --- a/ttl/sql_test.go +++ b/ttl/sql_test.go @@ -22,6 +22,8 @@ import ( "time" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/testkit" @@ -31,6 +33,130 @@ import ( "github.com/stretchr/testify/require" ) +func TestEscape(t *testing.T) { + tb := &ttl.PhysicalTable{ + Schema: model.NewCIStr("testp;\"';123`456"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("tp\"';123`456"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("col1\"';123`456"), FieldType: *types.NewFieldType(mysql.TypeString)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time\"';123`456"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + PartitionDef: &model.PartitionDefinition{ + Name: model.NewCIStr("p1\"';123`456"), + }, + } + + buildSelect := func(d []types.Datum) string { + b := ttl.NewSQLBuilder(tb) + require.NoError(t, b.WriteSelect()) + require.NoError(t, b.WriteCommonCondition(tb.KeyColumns, ">", d)) + require.NoError(t, b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + s, err := b.Build() + require.NoError(t, err) + return s + } + + buildDelete := func(ds ...[]types.Datum) string { + b := ttl.NewSQLBuilder(tb) + require.NoError(t, b.WriteDelete()) + require.NoError(t, b.WriteInCondition(tb.KeyColumns, ds...)) + require.NoError(t, b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + s, err := b.Build() + require.NoError(t, err) + return s + } + + cases := []struct { + tp string + ds [][]types.Datum + sql string + }{ + { + tp: "select", + ds: [][]types.Datum{d("key1'\";123`456")}, + sql: "SELECT LOW_PRIORITY `col1\"';123``456` FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` > 'key1\\'\\\";123`456' AND `time\"';123``456` < '1970-01-01 00:00:00'", + }, + { + tp: "delete", + ds: [][]types.Datum{d("key2'\";123`456")}, + sql: "DELETE LOW_PRIORITY FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` IN ('key2\\'\\\";123`456') AND `time\"';123``456` < '1970-01-01 00:00:00'", + }, + { + tp: "delete", + ds: [][]types.Datum{d("key3'\";123`456"), d("key4'`\"")}, + sql: "DELETE LOW_PRIORITY FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` IN ('key3\\'\\\";123`456', 'key4\\'`\\\"') AND `time\"';123``456` < '1970-01-01 00:00:00'", + }, + } + + for _, c := range cases { + switch c.tp { + case "select": + require.Equal(t, 1, len(c.ds)) + require.Equal(t, c.sql, buildSelect(c.ds[0])) + case "delete": + require.Equal(t, c.sql, buildDelete(c.ds...)) + default: + require.FailNow(t, "invalid tp: %s", c.tp) + } + + p := parser.New() + stmts, _, err := p.Parse(c.sql, "", "") + require.Equal(t, 1, len(stmts)) + require.NoError(t, err) + + var tbName *ast.TableName + var keyColumnName, timeColumnName string + var values []string + var timeString string + switch c.tp { + case "select": + stmt, ok := stmts[0].(*ast.SelectStmt) + require.True(t, ok) + tbName = stmt.From.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName) + and := stmt.Where.(*ast.BinaryOperationExpr) + cond1 := and.L.(*ast.BinaryOperationExpr) + keyColumnName = cond1.L.(*ast.ColumnNameExpr).Name.Name.O + values = []string{cond1.R.(ast.ValueExpr).GetValue().(string)} + cond2 := and.R.(*ast.BinaryOperationExpr) + timeColumnName = cond2.L.(*ast.ColumnNameExpr).Name.Name.O + timeString = cond2.R.(ast.ValueExpr).GetValue().(string) + case "delete": + stmt, ok := stmts[0].(*ast.DeleteStmt) + require.True(t, ok) + tbName = stmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName) + and := stmt.Where.(*ast.BinaryOperationExpr) + cond1 := and.L.(*ast.PatternInExpr) + keyColumnName = cond1.Expr.(*ast.ColumnNameExpr).Name.Name.O + require.Equal(t, len(c.ds), len(cond1.List)) + values = make([]string, 0, len(c.ds)) + for _, expr := range cond1.List { + values = append(values, expr.(ast.ValueExpr).GetValue().(string)) + } + cond2 := and.R.(*ast.BinaryOperationExpr) + timeColumnName = cond2.L.(*ast.ColumnNameExpr).Name.Name.O + timeString = cond2.R.(ast.ValueExpr).GetValue().(string) + default: + require.FailNow(t, "invalid tp: %s", c.tp) + } + + require.Equal(t, tb.Schema.O, tbName.Schema.O) + require.Equal(t, tb.Name.O, tbName.Name.O) + require.Equal(t, 1, len(tbName.PartitionNames)) + require.Equal(t, tb.PartitionDef.Name.O, tbName.PartitionNames[0].O) + require.Equal(t, tb.KeyColumns[0].Name.O, keyColumnName) + require.Equal(t, tb.TimeColumn.Name.O, timeColumnName) + for i, row := range c.ds { + require.Equal(t, row[0].GetString(), values[i]) + } + require.Equal(t, "1970-01-01 00:00:00", timeString) + } +} + func TestFormatSQLDatum(t *testing.T) { cases := []struct { ft string From a82b68f42941b1c5435470b7500e34ce46f2b614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Fri, 25 Nov 2022 16:41:57 +0800 Subject: [PATCH 5/6] Update ttl/BUILD.bazel Co-authored-by: Weizhen Wang --- ttl/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/ttl/BUILD.bazel b/ttl/BUILD.bazel index 8e82a7053df67..dadd37031d75a 100644 --- a/ttl/BUILD.bazel +++ b/ttl/BUILD.bazel @@ -38,4 +38,5 @@ go_test( "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", ], + flaky = True, ) From bca7ee728327d0ffe94e0e589f202bc0f181a066 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Fri, 25 Nov 2022 17:49:21 +0800 Subject: [PATCH 6/6] update --- ttl/BUILD.bazel | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ttl/BUILD.bazel b/ttl/BUILD.bazel index dadd37031d75a..e6a76c69d8df5 100644 --- a/ttl/BUILD.bazel +++ b/ttl/BUILD.bazel @@ -26,9 +26,12 @@ go_test( "main_test.go", "sql_test.go", ], + flaky = True, deps = [ ":ttl", "//kv", + "//parser", + "//parser/ast", "//parser/model", "//parser/mysql", "//testkit", @@ -38,5 +41,4 @@ go_test( "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", ], - flaky = True, )