diff --git a/DEPS.bzl b/DEPS.bzl index 0665d650970ba..f9a9cfe05aad5 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3896,6 +3896,14 @@ def go_deps(): sum = "h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=", version = "v1.12.0", ) + go_repository( + name = "com_github_spkg_bom", + build_file_proto_mode = "disable", + importpath = "github.com/spkg/bom", + sum = "h1:S939THe0ukL5WcTGiGqkgtaW5JW+O6ITaIlpJXTYY64=", + version = "v1.0.0", + ) + go_repository( name = "com_github_ssgreg_nlreturn_v2", build_file_proto_mode = "disable", diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index b9bdf564403de..fbf275a99bfe1 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -44,6 +44,8 @@ const ( retryTimeout = 3 * time.Second defaultMaxRetry = 3 + + dbTimeout = 30 * time.Second ) // MySQLConnectParam records the parameters needed to connect to a MySQL database. @@ -74,6 +76,8 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config { cfg.Params["charset"] = "utf8mb4" cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode) cfg.MaxAllowedPacket = int(param.MaxAllowedPacket) + cfg.ReadTimeout = dbTimeout + cfg.WriteTimeout = dbTimeout cfg.TLS = param.TLSConfig cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext diff --git a/br/pkg/lightning/config/BUILD.bazel b/br/pkg/lightning/config/BUILD.bazel index b69d2fca0d310..b035b506aebf2 100644 --- a/br/pkg/lightning/config/BUILD.bazel +++ b/br/pkg/lightning/config/BUILD.bazel @@ -25,6 +25,8 @@ go_library( "@com_github_docker_go_units//:go-units", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//keepalive", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index ea0cff40a04c7..16db98845e80c 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -643,7 +643,7 @@ func TestLoadConfig(t *testing.T) { err = taskCfg.Adjust(context.Background()) require.NoError(t, err) equivalentDSN := taskCfg.Checkpoint.MySQLParam.ToDriverConfig().FormatDSN() - expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27" + expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?readTimeout=30s&writeTimeout=30s&maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27" require.Equal(t, expectedDSN, equivalentDSN) result := taskCfg.String() diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index 23a38ac41117d..e114eafd8ea88 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -15,7 +15,11 @@ package config import ( + "time" + "github.com/docker/go-units" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) const ( @@ -34,3 +38,11 @@ const ( DefaultBatchSize ByteSize = 100 * units.GiB ) + +var ( + DefaultGrpcKeepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 20 * time.Second, + PermitWithoutStream: false, + }) +) diff --git a/br/pkg/lightning/mydump/BUILD.bazel b/br/pkg/lightning/mydump/BUILD.bazel index d265cad78bce6..a4aa1626afc46 100644 --- a/br/pkg/lightning/mydump/BUILD.bazel +++ b/br/pkg/lightning/mydump/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//util/slice", "//util/table-filter", "@com_github_pingcap_errors//:errors", + "@com_github_spkg_bom//:bom", "@com_github_xitongsys_parquet_go//parquet", "@com_github_xitongsys_parquet_go//reader", "@com_github_xitongsys_parquet_go//source", diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index 512c3789cfa7f..0ac82ce189d71 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" + "github.com/spkg/bom" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -285,7 +286,13 @@ func (parser *blockParser) readBlock() error { parser.remainBuf.Write(parser.buf) parser.appendBuf.Reset() parser.appendBuf.Write(parser.remainBuf.Bytes()) - parser.appendBuf.Write(parser.blockBuf[:n]) + blockData := parser.blockBuf[:n] + if parser.pos == 0 { + bomCleanedData := bom.Clean(blockData) + parser.pos += int64(n - len(bomCleanedData)) + blockData = bomCleanedData + } + parser.appendBuf.Write(blockData) parser.buf = parser.appendBuf.Bytes() if parser.metrics != nil { parser.metrics.ChunkParserReadBlockSecondsHistogram.Observe(time.Since(startTime).Seconds()) diff --git a/br/pkg/lightning/mydump/reader.go b/br/pkg/lightning/mydump/reader.go index 4837b35aceab2..3735e97cb48ee 100644 --- a/br/pkg/lightning/mydump/reader.go +++ b/br/pkg/lightning/mydump/reader.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/spkg/bom" "go.uber.org/zap" "golang.org/x/text/encoding/simplifiedchinese" ) @@ -83,7 +84,7 @@ func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile } defer fd.Close() - br := bufio.NewReader(fd) + br := bufio.NewReader(bom.NewReader(fd)) data := make([]byte, 0, sqlFile.FileMeta.FileSize+1) buffer := make([]byte, 0, sqlFile.FileMeta.FileSize+1) diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index ef5aeb106585b..06e503e0519db 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -80,7 +80,6 @@ go_library( "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//keepalive", "@org_golang_x_exp//maps", "@org_golang_x_exp//slices", "@org_golang_x_sync//errgroup", diff --git a/br/pkg/lightning/restore/precheck_impl.go b/br/pkg/lightning/restore/precheck_impl.go index f412b101ff08b..8d5142a8b5fd4 100644 --- a/br/pkg/lightning/restore/precheck_impl.go +++ b/br/pkg/lightning/restore/precheck_impl.go @@ -48,7 +48,6 @@ import ( "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" ) type clusterResourceCheckItem struct { @@ -733,11 +732,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, AutoSyncInterval: 30 * time.Second, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{ - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 3 * time.Second, - PermitWithoutStream: false, - }), + config.DefaultGrpcKeepaliveParams, grpc.WithBlock(), grpc.WithReturnConnectionError(), }, diff --git a/br/pkg/lightning/tikv/BUILD.bazel b/br/pkg/lightning/tikv/BUILD.bazel index 596aa52075758..48758bfedaacf 100644 --- a/br/pkg/lightning/tikv/BUILD.bazel +++ b/br/pkg/lightning/tikv/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//br/pkg/lightning/common", + "//br/pkg/lightning/config", "//br/pkg/lightning/log", "//br/pkg/pdutil", "//br/pkg/version", diff --git a/br/pkg/lightning/tikv/tikv.go b/br/pkg/lightning/tikv/tikv.go index 8d2d797d322d1..53c06cc6102f6 100644 --- a/br/pkg/lightning/tikv/tikv.go +++ b/br/pkg/lightning/tikv/tikv.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/debugpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/version" @@ -88,7 +89,7 @@ func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, a // Connect to the ImportSST service on the given TiKV node. // The connection is needed for executing `action` and will be tear down // when this function exits. - conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption()) + conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), config.DefaultGrpcKeepaliveParams) if err != nil { return errors.Trace(err) } @@ -172,7 +173,8 @@ var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",na // FetchMode obtains the import mode status of the TiKV node. func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) { - conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption()) + conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), + config.DefaultGrpcKeepaliveParams) if err != nil { return 0, err } diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 1726817092ba8..5ddd7b7671822 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/httputil", + "//br/pkg/lightning/config", "//br/pkg/logutil", "//br/pkg/redact", "//br/pkg/utils", diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index 5f6788d6ee470..72482a94e87dc 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn/util" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/httputil" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/store/pdtypes" pd "github.com/tikv/pd/client" @@ -201,7 +202,9 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key if err != nil { return nil, errors.Trace(err) } - conn, err := grpc.Dial(store.GetAddress(), grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(store.GetAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + config.DefaultGrpcKeepaliveParams) if err != nil { return nil, errors.Trace(err) } @@ -341,7 +344,8 @@ func sendSplitRegionRequest(ctx context.Context, c *pdClient, regionInfo *Region if c.tlsConf != nil { opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) } - conn, err := grpc.Dial(store.GetAddress(), opt) + conn, err := grpc.Dial(store.GetAddress(), opt, + config.DefaultGrpcKeepaliveParams) if err != nil { return false, nil, err } diff --git a/br/tests/lightning_bom_file/config.toml b/br/tests/lightning_bom_file/config.toml new file mode 100644 index 0000000000000..291d1b166103a --- /dev/null +++ b/br/tests/lightning_bom_file/config.toml @@ -0,0 +1,2 @@ +[mydumper.csv] +header = true diff --git a/br/tests/lightning_bom_file/data/mytest.testtbl-schema.sql b/br/tests/lightning_bom_file/data/mytest.testtbl-schema.sql new file mode 100644 index 0000000000000..4232788898790 --- /dev/null +++ b/br/tests/lightning_bom_file/data/mytest.testtbl-schema.sql @@ -0,0 +1,5 @@ +CREATE TABLE testtbl ( + id INTEGER, + val1 VARCHAR(40) NOT NULL, + INDEX `idx_val1` (`val1`) +); diff --git a/br/tests/lightning_bom_file/data/mytest.testtbl.csv b/br/tests/lightning_bom_file/data/mytest.testtbl.csv new file mode 100644 index 0000000000000..e0931cce2a480 --- /dev/null +++ b/br/tests/lightning_bom_file/data/mytest.testtbl.csv @@ -0,0 +1,6 @@ +id,val1 +1,"aaa01" +2,"aaa01" +3,"aaa02" +4,"aaa02" +5,"aaa05" diff --git a/br/tests/lightning_bom_file/original_data/mytest.testtbl-schema.sql b/br/tests/lightning_bom_file/original_data/mytest.testtbl-schema.sql new file mode 100644 index 0000000000000..dc1e032a16618 --- /dev/null +++ b/br/tests/lightning_bom_file/original_data/mytest.testtbl-schema.sql @@ -0,0 +1,5 @@ +CREATE TABLE testtbl ( + id INTEGER, + val1 VARCHAR(40) NOT NULL, + INDEX `idx_val1` (`val1`) +); diff --git a/br/tests/lightning_bom_file/original_data/mytest.testtbl.csv b/br/tests/lightning_bom_file/original_data/mytest.testtbl.csv new file mode 100644 index 0000000000000..270c410cd79fd --- /dev/null +++ b/br/tests/lightning_bom_file/original_data/mytest.testtbl.csv @@ -0,0 +1,6 @@ +id,val1 +1,"aaa01" +2,"aaa01" +3,"aaa02" +4,"aaa02" +5,"aaa05" diff --git a/br/tests/lightning_bom_file/run.sh b/br/tests/lightning_bom_file/run.sh new file mode 100755 index 0000000000000..88eada54c74a9 --- /dev/null +++ b/br/tests/lightning_bom_file/run.sh @@ -0,0 +1,56 @@ +#!/bin/sh +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +mydir=$(dirname "${BASH_SOURCE[0]}") + +original_schema_file="${mydir}/original_data/mytest.testtbl-schema.sql" +original_data_file="${mydir}/original_data/mytest.testtbl.csv" +schema_file="${original_schema_file/original_data/data}" +data_file="${original_data_file/original_data/data}" + +# add the BOM header +printf '\xEF\xBB\xBF' | cat - <( sed '1s/^\xEF\xBB\xBF//' "${original_schema_file}" ) > "${schema_file}" +printf '\xEF\xBB\xBF' | cat - <( sed '1s/^\xEF\xBB\xBF//' "${original_data_file}" ) > "${data_file}" + +# verify the BOM header +if ! grep -q $'^\xEF\xBB\xBF' "${schema_file}"; then + echo "schema file doesn't contain the BOM header" >&2 + exit 1 +fi + +if ! grep -q $'^\xEF\xBB\xBF' "${data_file}"; then + echo "data file doesn't contain the BOM header" >&2 + exit 1 +fi + +row_count=$( sed '1d' "${data_file}" | wc -l | xargs echo ) + +run_lightning --backend tidb + +# Check that everything is correctly imported +run_sql 'SELECT count(*) FROM mytest.testtbl' +check_contains "count(*): ${row_count}" + +check_cluster_version 4 0 0 'local backend' || exit 0 +run_sql "DROP TABLE mytest.testtbl" + +run_lightning --backend local + +# Check that everything is correctly imported +run_sql 'SELECT count(*) FROM mytest.testtbl' +check_contains "count(*): ${row_count}" diff --git a/ddl/index.go b/ddl/index.go index 221005f58c211..cc3a3d3795242 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -791,6 +791,9 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) if done { job.MarkNonRevertible() + if err == nil { + ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) + } } // We need another round to wait for all the others sub-jobs to finish. return false, ver, err @@ -877,7 +880,6 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo return false, ver, err } indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. - ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return true, ver, err default: return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) diff --git a/go.mod b/go.mod index 35374deb43ce0..88bc1f0fd6e53 100644 --- a/go.mod +++ b/go.mod @@ -87,6 +87,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 + github.com/spkg/bom v1.0.0 github.com/stathat/consistent v1.0.0 github.com/stretchr/testify v1.8.1 github.com/tdakkota/asciicheck v0.1.1 diff --git a/go.sum b/go.sum index 75477ce99b199..d74e95aeb8783 100644 --- a/go.sum +++ b/go.sum @@ -1278,6 +1278,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/spkg/bom v1.0.0 h1:S939THe0ukL5WcTGiGqkgtaW5JW+O6ITaIlpJXTYY64= +github.com/spkg/bom v1.0.0/go.mod h1:lAz2VbTuYNcvs7iaFF8WW0ufXrHShJ7ck1fYFFbVXJs= github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/planner/core/indexmerge_path_test.go b/planner/core/indexmerge_path_test.go index 430171f5bcff7..689893028e937 100644 --- a/planner/core/indexmerge_path_test.go +++ b/planner/core/indexmerge_path_test.go @@ -24,6 +24,44 @@ import ( "github.com/stretchr/testify/require" ) +func TestAnalyzeMVIndex(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t(a int, b int, c int, j json, +index(a), index(b), +index idx(a, b, (cast(j as signed array)), c), +index idx2(a, b, (cast(j->'$.str' as char(10) array)), c))`) + + tk.MustExec("set tidb_analyze_version=2") + tk.MustExec("analyze table t") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t", + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx", + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx2")) + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t", + "Warning 1105 The version 2 would collect all statistics not only the selected indexes", + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx", + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx2")) + + tk.MustExec("set tidb_analyze_version=1") + tk.MustExec("analyze table t") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx", + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx2")) + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx")) + tk.MustExec("analyze table t index a") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows()) + tk.MustExec("analyze table t index a, idx, idx2") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx", + "Warning 1105 analyzing multi-valued indexes is not supported, skip idx2")) +} + func TestIndexMergeJSONMemberOf(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 5af0bb004d6c9..d14a6bf51ea49 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -2320,12 +2320,16 @@ func getColOffsetForAnalyze(colsInfo []*model.ColumnInfo, colID int64) int { // in tblInfo.Indices, index.Columns[i].Offset is set according to tblInfo.Columns. Since we decode row samples according to colsInfo rather than tbl.Columns // in the execution phase of ANALYZE, we need to modify index.Columns[i].Offset according to colInfos. // TODO: find a better way to find indexed columns in ANALYZE rather than use IndexColumn.Offset -func getModifiedIndexesInfoForAnalyze(tblInfo *model.TableInfo, allColumns bool, colsInfo []*model.ColumnInfo) []*model.IndexInfo { +func getModifiedIndexesInfoForAnalyze(sctx sessionctx.Context, tblInfo *model.TableInfo, allColumns bool, colsInfo []*model.ColumnInfo) []*model.IndexInfo { idxsInfo := make([]*model.IndexInfo, 0, len(tblInfo.Indices)) for _, originIdx := range tblInfo.Indices { if originIdx.State != model.StatePublic { continue } + if originIdx.MVIndex { + sctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("analyzing multi-valued indexes is not supported, skip %s", originIdx.Name.L)) + continue + } if allColumns { // If all the columns need to be analyzed, we don't need to modify IndexColumn.Offset. idxsInfo = append(idxsInfo, originIdx) @@ -2401,7 +2405,7 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask( execColsInfo = colsInfo } allColumns := len(tbl.TableInfo.Columns) == len(execColsInfo) - indexes := getModifiedIndexesInfoForAnalyze(tbl.TableInfo, allColumns, execColsInfo) + indexes := getModifiedIndexesInfoForAnalyze(b.ctx, tbl.TableInfo, allColumns, execColsInfo) handleCols := BuildHandleColsForAnalyze(b.ctx, tbl.TableInfo, allColumns, execColsInfo) newTask := AnalyzeColumnsTask{ HandleCols: handleCols, @@ -2631,6 +2635,10 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt, opts map[ast.A commonHandleInfo = idx continue } + if idx.MVIndex { + b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("analyzing multi-valued indexes is not supported, skip %s", idx.Name.L)) + continue + } for i, id := range physicalIDs { if id == tbl.TableInfo.ID { id = -1 @@ -2724,6 +2732,10 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt, opts map[ast.A if idx == nil || idx.State != model.StatePublic { return nil, ErrAnalyzeMissIndex.GenWithStackByArgs(idxName.O, tblInfo.Name.O) } + if idx.MVIndex { + b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("analyzing multi-valued indexes is not supported, skip %s", idx.Name.L)) + continue + } for i, id := range physicalIDs { if id == tblInfo.ID { id = -1 @@ -2766,6 +2778,11 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt, opts map[as } for _, idx := range tblInfo.Indices { if idx.State == model.StatePublic { + if idx.MVIndex { + b.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("analyzing multi-valued indexes is not supported, skip %s", idx.Name.L)) + continue + } + for i, id := range physicalIDs { if id == tblInfo.ID { id = -1 diff --git a/telemetry/BUILD.bazel b/telemetry/BUILD.bazel index 1f032aa3f237a..a6c79f7de596f 100644 --- a/telemetry/BUILD.bazel +++ b/telemetry/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "id.go", "status.go", "telemetry.go", + "ttl.go", "util.go", ], importpath = "github.com/pingcap/tidb/telemetry", @@ -24,6 +25,7 @@ go_library( "//infoschema", "//kv", "//metrics", + "//parser/ast", "//parser/model", "//parser/mysql", "//sessionctx", diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 81bf7a9785a3a..8661ce13ecccb 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -60,6 +60,7 @@ type featureUsage struct { AutoIDNoCache bool `json:"autoIDNoCache"` IndexMergeUsageCounter *m.IndexMergeUsageCounter `json:"indexMergeUsageCounter"` ResourceControlUsage *resourceControlUsage `json:"resourceControl"` + TTLUsage *ttlUsageCounter `json:"ttlUsage"` } type placementPolicyUsage struct { @@ -117,6 +118,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag usage.IndexMergeUsageCounter = getIndexMergeUsageInfo() + usage.TTLUsage = getTTLUsageInfo(ctx, sctx) + return &usage, nil } diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index a678bc681eb18..a667219ba50a8 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -15,12 +15,16 @@ package telemetry_test import ( + "encoding/json" "fmt" + "strings" "testing" + "time" _ "github.com/pingcap/tidb/autoid_service" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/testkit" @@ -619,3 +623,157 @@ func TestIndexMergeUsage(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), usage.IndexMergeUsageCounter.IndexMergeUsed) } + +func TestTTLTelemetry(t *testing.T) { + timeFormat := "2006-01-02 15:04:05" + dateFormat := "2006-01-02" + + now := time.Now() + curDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + if interval := curDate.Add(time.Hour * 24).Sub(now); interval > 0 && interval < 5*time.Minute { + // make sure testing is not running at the end of one day + time.Sleep(interval) + } + + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_ttl_job_enable=0") + + getTTLTable := func(name string) *model.TableInfo { + tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(name)) + require.NoError(t, err) + require.NotNil(t, tbl.Meta().TTLInfo) + return tbl.Meta() + } + + jobIDIdx := 1 + insertTTLHistory := func(tblName string, partitionName string, createTime, finishTime, ttlExpire time.Time, scanError string, totalRows, errorRows int64, status string) { + defer func() { + jobIDIdx++ + }() + + tbl := getTTLTable(tblName) + tblID := tbl.ID + partitionID := tbl.ID + if partitionName != "" { + for _, def := range tbl.Partition.Definitions { + if def.Name.L == strings.ToLower(partitionName) { + partitionID = def.ID + } + } + require.NotEqual(t, tblID, partitionID) + } + + summary := make(map[string]interface{}) + summary["total_rows"] = totalRows + summary["success_rows"] = totalRows - errorRows + summary["error_rows"] = errorRows + summary["total_scan_task"] = 1 + summary["scheduled_scan_task"] = 1 + summary["finished_scan_task"] = 1 + if scanError != "" { + summary["scan_task_err"] = scanError + } + + summaryText, err := json.Marshal(summary) + require.NoError(t, err) + + tk.MustExec("insert into "+ + "mysql.tidb_ttl_job_history ("+ + " job_id, table_id, parent_table_id, table_schema, table_name, partition_name, "+ + " create_time, finish_time, ttl_expire, summary_text, "+ + " expired_rows, deleted_rows, error_delete_rows, status) "+ + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + jobIDIdx, partitionID, tblID, "test", tblName, partitionName, + createTime.Format(timeFormat), finishTime.Format(timeFormat), ttlExpire.Format(timeFormat), summaryText, + totalRows, totalRows-errorRows, errorRows, status, + ) + } + + oneDayAgoDate := curDate.Add(-24 * time.Hour) + // start today, end today + times11 := []time.Time{curDate.Add(time.Hour), curDate.Add(2 * time.Hour), curDate} + // start yesterday, end today + times21 := []time.Time{curDate.Add(-2 * time.Hour), curDate, curDate.Add(-3 * time.Hour)} + // start yesterday, end yesterday + times31 := []time.Time{oneDayAgoDate, oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-time.Hour)} + times32 := []time.Time{oneDayAgoDate.Add(2 * time.Hour), oneDayAgoDate.Add(3 * time.Hour), oneDayAgoDate.Add(time.Hour)} + times33 := []time.Time{oneDayAgoDate.Add(4 * time.Hour), oneDayAgoDate.Add(5 * time.Hour), oneDayAgoDate.Add(3 * time.Hour)} + // start 2 days ago, end yesterday + times41 := []time.Time{oneDayAgoDate.Add(-2 * time.Hour), oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-3 * time.Hour)} + // start two days ago, end two days ago + times51 := []time.Time{oneDayAgoDate.Add(-5 * time.Hour), oneDayAgoDate.Add(-4 * time.Hour), oneDayAgoDate.Add(-6 * time.Hour)} + + tk.MustExec("create table t1 (t timestamp) TTL=`t` + interval 1 hour") + insertTTLHistory("t1", "", times11[0], times11[1], times11[2], "", 100000000, 0, "finished") + insertTTLHistory("t1", "", times21[0], times21[1], times21[2], "", 100000000, 0, "finished") + insertTTLHistory("t1", "", times31[0], times31[1], times31[2], "err1", 112600, 110000, "finished") + insertTTLHistory("t1", "", times32[0], times32[1], times32[2], "", 2600, 0, "timeout") + insertTTLHistory("t1", "", times33[0], times33[1], times33[2], "", 2600, 0, "finished") + insertTTLHistory("t1", "", times41[0], times41[1], times41[2], "", 2600, 0, "finished") + insertTTLHistory("t1", "", times51[0], times51[1], times51[2], "", 100000000, 1, "finished") + + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + checkTableHistWithDeleteRows := func(vals ...int64) { + require.Equal(t, 5, len(vals)) + require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDeleteRows)) + require.Equal(t, int64(10*1000), *usage.TTLUsage.TableHistWithDeleteRows[0].LessThan) + require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDeleteRows[0].Count) + require.Equal(t, int64(100*1000), *usage.TTLUsage.TableHistWithDeleteRows[1].LessThan) + require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDeleteRows[1].Count) + require.Equal(t, int64(1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[2].LessThan) + require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDeleteRows[2].Count) + require.Equal(t, int64(10*1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[3].LessThan) + require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDeleteRows[3].Count) + require.True(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThanMax) + require.Nil(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThan) + require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDeleteRows[4].Count) + } + + checkTableHistWithDelay := func(vals ...int64) { + require.Equal(t, 5, len(vals)) + require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDelayTime)) + require.Equal(t, int64(1), *usage.TTLUsage.TableHistWithDelayTime[0].LessThan) + require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDelayTime[0].Count) + require.Equal(t, int64(6), *usage.TTLUsage.TableHistWithDelayTime[1].LessThan) + require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDelayTime[1].Count) + require.Equal(t, int64(24), *usage.TTLUsage.TableHistWithDelayTime[2].LessThan) + require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDelayTime[2].Count) + require.Equal(t, int64(72), *usage.TTLUsage.TableHistWithDelayTime[3].LessThan) + require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDelayTime[3].Count) + require.True(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThanMax) + require.Nil(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThan) + require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDelayTime[4].Count) + } + + require.False(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(1), usage.TTLUsage.TTLTables) + require.Equal(t, int64(1), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(0, 1, 0, 0, 0) + checkTableHistWithDelay(0, 0, 1, 0, 0) + + tk.MustExec("create table t2 (t timestamp) TTL=`t` + interval 20 hour") + tk.MustExec("set @@global.tidb_ttl_job_enable=1") + insertTTLHistory("t2", "", times31[0], times31[1], times31[2], "", 9999, 0, "finished") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.True(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(2), usage.TTLUsage.TTLTables) + require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(1, 1, 0, 0, 0) + checkTableHistWithDelay(0, 1, 1, 0, 0) + + tk.MustExec("create table t3 (t timestamp) TTL=`t` + interval 1 hour TTL_ENABLE='OFF'") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.True(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(3), usage.TTLUsage.TTLTables) + require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(1, 1, 0, 0, 0) + checkTableHistWithDelay(0, 1, 1, 0, 1) +} diff --git a/telemetry/main_test.go b/telemetry/main_test.go index 0e8d98b2a4f6c..8478a3ead4084 100644 --- a/telemetry/main_test.go +++ b/telemetry/main_test.go @@ -41,6 +41,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), } goleak.VerifyTestMain(m, opts...) diff --git a/telemetry/ttl.go b/telemetry/ttl.go new file mode 100644 index 0000000000000..b9c8c0210fb0c --- /dev/null +++ b/telemetry/ttl.go @@ -0,0 +1,214 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" +) + +const ( + // selectDeletedRowsOneDaySQL selects the deleted rows for each table of last day + selectDeletedRowsOneDaySQL = `SELECT parent_table_id, CAST(SUM(deleted_rows) AS SIGNED) + FROM + mysql.tidb_ttl_job_history + WHERE + create_time >= CURDATE() - INTERVAL 7 DAY + AND finish_time >= CURDATE() - INTERVAL 1 DAY + AND finish_time < CURDATE() + GROUP BY parent_table_id;` + // selectDelaySQL selects the deletion delay in minute for each table at the end of last day + selectDelaySQL = `SELECT + parent_table_id, TIMESTAMPDIFF(MINUTE, MIN(tm), CURDATE()) AS ttl_minutes + FROM + ( + SELECT + table_id, + parent_table_id, + MAX(ttl_expire) AS tm + FROM + mysql.tidb_ttl_job_history + WHERE + create_time > CURDATE() - INTERVAL 7 DAY + AND finish_time < CURDATE() + AND status = 'finished' + AND JSON_VALID(summary_text) + AND summary_text ->> "$.scan_task_err" IS NULL + GROUP BY + table_id, parent_table_id + ) t + GROUP BY parent_table_id;` +) + +type ttlHistItem struct { + // LessThan is not null means it collects the count of items with condition [prevLessThan, LessThan) + // Notice that it's type is an int64 pointer to forbid serializing it when it is not set. + LessThan *int64 `json:"less_than,omitempty"` + // LessThanMax is true means the condition is [prevLessThan, MAX) + LessThanMax bool `json:"less_than_max,omitempty"` + // Count is the count of items that fit the condition + Count int64 `json:"count"` +} + +type ttlUsageCounter struct { + TTLJobEnabled bool `json:"ttl_job_enabled"` + TTLTables int64 `json:"ttl_table_count"` + TTLJobEnabledTables int64 `json:"ttl_job_enabled_tables"` + TTLHistDate string `json:"ttl_hist_date"` + TableHistWithDeleteRows []*ttlHistItem `json:"table_hist_with_delete_rows"` + TableHistWithDelayTime []*ttlHistItem `json:"table_hist_with_delay_time"` +} + +func int64Pointer(val int64) *int64 { + v := val + return &v +} + +func (c *ttlUsageCounter) UpdateTableHistWithDeleteRows(rows int64) { + for _, item := range c.TableHistWithDeleteRows { + if item.LessThanMax || rows < *item.LessThan { + item.Count++ + return + } + } +} + +func (c *ttlUsageCounter) UpdateTableHistWithDelayTime(tblCnt int, hours int64) { + for _, item := range c.TableHistWithDelayTime { + if item.LessThanMax || hours < *item.LessThan { + item.Count += int64(tblCnt) + return + } + } +} + +func getTTLUsageInfo(ctx context.Context, sctx sessionctx.Context) (counter *ttlUsageCounter) { + counter = &ttlUsageCounter{ + TTLJobEnabled: variable.EnableTTLJob.Load(), + TTLHistDate: time.Now().Add(-24 * time.Hour).Format("2006-01-02"), + TableHistWithDeleteRows: []*ttlHistItem{ + { + LessThan: int64Pointer(10 * 1000), + }, + { + LessThan: int64Pointer(100 * 1000), + }, + { + LessThan: int64Pointer(1000 * 1000), + }, + { + LessThan: int64Pointer(10000 * 1000), + }, + { + LessThanMax: true, + }, + }, + TableHistWithDelayTime: []*ttlHistItem{ + { + LessThan: int64Pointer(1), + }, + { + LessThan: int64Pointer(6), + }, + { + LessThan: int64Pointer(24), + }, + { + LessThan: int64Pointer(72), + }, + { + LessThanMax: true, + }, + }, + } + + is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + if !ok { + // it should never happen + logutil.BgLogger().Error(fmt.Sprintf("GetDomainInfoSchema returns a invalid type: %T", is)) + return + } + + ttlTables := make(map[int64]*model.TableInfo) + for _, db := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(db.Name) { + tblInfo := tbl.Meta() + if tblInfo.State != model.StatePublic || tblInfo.TTLInfo == nil { + continue + } + + counter.TTLTables++ + if tblInfo.TTLInfo.Enable { + counter.TTLJobEnabledTables++ + } + ttlTables[tblInfo.ID] = tblInfo + } + } + + exec := sctx.(sqlexec.RestrictedSQLExecutor) + rows, _, err := exec.ExecRestrictedSQL(ctx, nil, selectDeletedRowsOneDaySQL) + if err != nil { + logutil.BgLogger().Error("exec sql error", zap.String("SQL", selectDeletedRowsOneDaySQL), zap.Error(err)) + } else { + for _, row := range rows { + counter.UpdateTableHistWithDeleteRows(row.GetInt64(1)) + } + } + + rows, _, err = exec.ExecRestrictedSQL(ctx, nil, selectDelaySQL) + if err != nil { + logutil.BgLogger().Error("exec sql error", zap.String("SQL", selectDelaySQL), zap.Error(err)) + } else { + noHistoryTables := len(ttlTables) + for _, row := range rows { + tblID := row.GetInt64(0) + tbl, ok := ttlTables[tblID] + if !ok { + // table not exist, maybe truncated or deleted + continue + } + noHistoryTables-- + + evalIntervalSQL := fmt.Sprintf( + "SELECT TIMESTAMPDIFF(HOUR, CURDATE() - INTERVAL %d MINUTE, CURDATE() - INTERVAL %s %s)", + row.GetInt64(1), tbl.TTLInfo.IntervalExprStr, ast.TimeUnitType(tbl.TTLInfo.IntervalTimeUnit).String(), + ) + + innerRows, _, err := exec.ExecRestrictedSQL(ctx, nil, evalIntervalSQL) + if err != nil || len(innerRows) == 0 { + logutil.BgLogger().Error("exec sql error or empty rows returned", zap.String("SQL", evalIntervalSQL), zap.Error(err)) + continue + } + + hours := innerRows[0].GetInt64(0) + counter.UpdateTableHistWithDelayTime(1, hours) + } + + // When no history found for a table, use max delay + counter.UpdateTableHistWithDelayTime(noHistoryTables, math.MaxInt64) + } + return +} diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go index 468150c3949a7..89ca9eedae010 100644 --- a/ttl/ttlworker/config.go +++ b/ttl/ttlworker/config.go @@ -32,7 +32,7 @@ const ttlJobTimeout = 6 * time.Hour const taskManagerLoopTickerInterval = time.Minute const ttlTaskHeartBeatTickerInterval = time.Minute -const ttlTaskGCInterval = time.Hour +const ttlGCInterval = time.Hour func getUpdateInfoSchemaCacheInterval() time.Duration { failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration { diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 132be2e626cde..0b427e64318ac 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -58,6 +58,8 @@ const taskGCTemplate = `DELETE task FROM ON task.job_id = job.current_job_id WHERE job.table_id IS NULL` +const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY` + const timeFormat = "2006-01-02 15:04:05" func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) { @@ -143,7 +145,7 @@ func (m *JobManager) jobLoop() error { infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval()) tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval()) resizeWorkersTicker := time.Tick(getResizeWorkersInterval()) - taskGC := time.Tick(jobManagerLoopTickerInterval) + gcTicker := time.Tick(ttlGCInterval) scheduleJobTicker := time.Tick(jobManagerLoopTickerInterval) jobCheckTicker := time.Tick(jobManagerLoopTickerInterval) @@ -175,12 +177,9 @@ func (m *JobManager) jobLoop() error { if err != nil { logutil.Logger(m.ctx).Warn("fail to update table status cache", zap.Error(err)) } - case <-taskGC: - taskGCCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - _, err = se.ExecuteSQL(taskGCCtx, taskGCTemplate) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to gc redundant scan task", zap.Error(err)) - } + case <-gcTicker: + gcCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + DoGC(gcCtx, se) cancel() // Job Schedule loop: case <-updateJobHeartBeatTicker: @@ -777,3 +776,14 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) { summary.SummaryText = string(buf) return summary, nil } + +// DoGC deletes some old TTL job histories and redundant scan tasks +func DoGC(ctx context.Context, se session.Session) { + if _, err := se.ExecuteSQL(ctx, taskGCTemplate); err != nil { + logutil.Logger(ctx).Warn("fail to gc redundant scan task", zap.Error(err)) + } + + if _, err := se.ExecuteSQL(ctx, ttlJobHistoryGCTemplate); err != nil { + logutil.Logger(ctx).Warn("fail to gc ttl job history", zap.Error(err)) + } +} diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 82b107b976b70..e2e864344fde3 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -490,3 +490,87 @@ func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) { continue } } + +func TestGCScanTasks(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + addTableStatusRecord := func(tableID, parentTableID, curJobID int64) { + tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", tableID, parentTableID) + if curJobID == 0 { + return + } + + tk.MustExec(`UPDATE mysql.tidb_ttl_table_status + SET current_job_id = ?, + current_job_owner_id = '12345', + current_job_start_time = NOW(), + current_job_status = 'running', + current_job_status_update_time = NOW(), + current_job_ttl_expire = NOW(), + current_job_owner_hb_time = NOW() + WHERE table_id = ?`, curJobID, tableID) + } + + addScanTaskRecord := func(jobID, tableID, scanID int64) { + tk.MustExec(`INSERT INTO mysql.tidb_ttl_task SET + job_id = ?, + table_id = ?, + scan_id = ?, + expire_time = NOW(), + created_time = NOW()`, jobID, tableID, scanID) + } + + addTableStatusRecord(1, 1, 1) + addScanTaskRecord(1, 1, 1) + addScanTaskRecord(1, 1, 2) + addScanTaskRecord(2, 1, 1) + addScanTaskRecord(2, 1, 2) + addScanTaskRecord(3, 2, 1) + addScanTaskRecord(3, 2, 2) + + se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) + ttlworker.DoGC(context.TODO(), se) + tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2")) +} + +func TestGCTTLHistory(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + addHistory := func(jobID, createdBeforeDays int) { + tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb_ttl_job_history ( + job_id, + table_id, + parent_table_id, + table_schema, + table_name, + partition_name, + create_time, + finish_time, + ttl_expire, + summary_text, + expired_rows, + deleted_rows, + error_delete_rows, + status + ) + VALUES + ( + %d, 1, 1, 'test', 't1', '', + CURDATE() - INTERVAL %d DAY, + CURDATE() - INTERVAL %d DAY + INTERVAL 1 HOUR, + CURDATE() - INTERVAL %d DAY, + "", 100, 100, 0, "finished" + )`, jobID, createdBeforeDays, createdBeforeDays, createdBeforeDays)) + } + + addHistory(1, 1) + addHistory(2, 30) + addHistory(3, 60) + addHistory(4, 89) + addHistory(5, 90) + addHistory(6, 91) + addHistory(7, 100) + se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) + ttlworker.DoGC(context.TODO(), se) + tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5")) +}