Skip to content

Commit

Permalink
Merge branch 'master' into swicthForPCLImit
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jan 30, 2023
2 parents ba5b77e + e11cbec commit 07f0413
Show file tree
Hide file tree
Showing 33 changed files with 675 additions and 25 deletions.
8 changes: 8 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3896,6 +3896,14 @@ def go_deps():
sum = "h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=",
version = "v1.12.0",
)
go_repository(
name = "com_github_spkg_bom",
build_file_proto_mode = "disable",
importpath = "github.com/spkg/bom",
sum = "h1:S939THe0ukL5WcTGiGqkgtaW5JW+O6ITaIlpJXTYY64=",
version = "v1.0.0",
)

go_repository(
name = "com_github_ssgreg_nlreturn_v2",
build_file_proto_mode = "disable",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
retryTimeout = 3 * time.Second

defaultMaxRetry = 3

dbTimeout = 30 * time.Second
)

// MySQLConnectParam records the parameters needed to connect to a MySQL database.
Expand Down Expand Up @@ -74,6 +76,8 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
cfg.Params["charset"] = "utf8mb4"
cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode)
cfg.MaxAllowedPacket = int(param.MaxAllowedPacket)
cfg.ReadTimeout = dbTimeout
cfg.WriteTimeout = dbTimeout

cfg.TLS = param.TLSConfig
cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func TestLoadConfig(t *testing.T) {
err = taskCfg.Adjust(context.Background())
require.NoError(t, err)
equivalentDSN := taskCfg.Checkpoint.MySQLParam.ToDriverConfig().FormatDSN()
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?readTimeout=30s&writeTimeout=30s&maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
require.Equal(t, expectedDSN, equivalentDSN)

result := taskCfg.String()
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package config

import (
"time"

"github.com/docker/go-units"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

const (
Expand All @@ -34,3 +38,11 @@ const (

DefaultBatchSize ByteSize = 100 * units.GiB
)

var (
DefaultGrpcKeepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
PermitWithoutStream: false,
})
)
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//util/slice",
"//util/table-filter",
"@com_github_pingcap_errors//:errors",
"@com_github_spkg_bom//:bom",
"@com_github_xitongsys_parquet_go//parquet",
"@com_github_xitongsys_parquet_go//reader",
"@com_github_xitongsys_parquet_go//source",
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/spkg/bom"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -285,7 +286,13 @@ func (parser *blockParser) readBlock() error {
parser.remainBuf.Write(parser.buf)
parser.appendBuf.Reset()
parser.appendBuf.Write(parser.remainBuf.Bytes())
parser.appendBuf.Write(parser.blockBuf[:n])
blockData := parser.blockBuf[:n]
if parser.pos == 0 {
bomCleanedData := bom.Clean(blockData)
parser.pos += int64(n - len(bomCleanedData))
blockData = bomCleanedData
}
parser.appendBuf.Write(blockData)
parser.buf = parser.appendBuf.Bytes()
if parser.metrics != nil {
parser.metrics.ChunkParserReadBlockSecondsHistogram.Observe(time.Since(startTime).Seconds())
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/spkg/bom"
"go.uber.org/zap"
"golang.org/x/text/encoding/simplifiedchinese"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile
}
defer fd.Close()

br := bufio.NewReader(fd)
br := bufio.NewReader(bom.NewReader(fd))

data := make([]byte, 0, sqlFile.FileMeta.FileSize+1)
buffer := make([]byte, 0, sqlFile.FileMeta.FileSize+1)
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down
7 changes: 1 addition & 6 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type clusterResourceCheckItem struct {
Expand Down Expand Up @@ -733,11 +732,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
}),
config.DefaultGrpcKeepaliveParams,
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
},
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/pdutil",
"//br/pkg/version",
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/tikv/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -88,7 +89,7 @@ func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, a
// Connect to the ImportSST service on the given TiKV node.
// The connection is needed for executing `action` and will be tear down
// when this function exits.
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), config.DefaultGrpcKeepaliveParams)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -172,7 +173,8 @@ var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",na

// FetchMode obtains the import mode status of the TiKV node.
func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) {
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/httputil",
"//br/pkg/lightning/config",
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/utils",
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/store/pdtypes"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -201,7 +202,9 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(store.GetAddress(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -341,7 +344,8 @@ func sendSplitRegionRequest(ctx context.Context, c *pdClient, regionInfo *Region
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
conn, err := grpc.Dial(store.GetAddress(), opt,
config.DefaultGrpcKeepaliveParams)
if err != nil {
return false, nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_bom_file/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[mydumper.csv]
header = true
5 changes: 5 additions & 0 deletions br/tests/lightning_bom_file/data/mytest.testtbl-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE testtbl (
id INTEGER,
val1 VARCHAR(40) NOT NULL,
INDEX `idx_val1` (`val1`)
);
6 changes: 6 additions & 0 deletions br/tests/lightning_bom_file/data/mytest.testtbl.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,val1
1,"aaa01"
2,"aaa01"
3,"aaa02"
4,"aaa02"
5,"aaa05"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE testtbl (
id INTEGER,
val1 VARCHAR(40) NOT NULL,
INDEX `idx_val1` (`val1`)
);
6 changes: 6 additions & 0 deletions br/tests/lightning_bom_file/original_data/mytest.testtbl.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,val1
1,"aaa01"
2,"aaa01"
3,"aaa02"
4,"aaa02"
5,"aaa05"
56 changes: 56 additions & 0 deletions br/tests/lightning_bom_file/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/sh
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

mydir=$(dirname "${BASH_SOURCE[0]}")

original_schema_file="${mydir}/original_data/mytest.testtbl-schema.sql"
original_data_file="${mydir}/original_data/mytest.testtbl.csv"
schema_file="${original_schema_file/original_data/data}"
data_file="${original_data_file/original_data/data}"

# add the BOM header
printf '\xEF\xBB\xBF' | cat - <( sed '1s/^\xEF\xBB\xBF//' "${original_schema_file}" ) > "${schema_file}"
printf '\xEF\xBB\xBF' | cat - <( sed '1s/^\xEF\xBB\xBF//' "${original_data_file}" ) > "${data_file}"

# verify the BOM header
if ! grep -q $'^\xEF\xBB\xBF' "${schema_file}"; then
echo "schema file doesn't contain the BOM header" >&2
exit 1
fi

if ! grep -q $'^\xEF\xBB\xBF' "${data_file}"; then
echo "data file doesn't contain the BOM header" >&2
exit 1
fi

row_count=$( sed '1d' "${data_file}" | wc -l | xargs echo )

run_lightning --backend tidb

# Check that everything is correctly imported
run_sql 'SELECT count(*) FROM mytest.testtbl'
check_contains "count(*): ${row_count}"

check_cluster_version 4 0 0 'local backend' || exit 0
run_sql "DROP TABLE mytest.testtbl"

run_lightning --backend local

# Check that everything is correctly imported
run_sql 'SELECT count(*) FROM mytest.testtbl'
check_contains "count(*): ${row_count}"
4 changes: 3 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,9 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if done {
job.MarkNonRevertible()
if err == nil {
ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true)
}
}
// We need another round to wait for all the others sub-jobs to finish.
return false, ver, err
Expand Down Expand Up @@ -877,7 +880,6 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
return false, ver, err
}
indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index.
ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true)
return true, ver, err
default:
return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spkg/bom v1.0.0
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.8.1
github.com/tdakkota/asciicheck v0.1.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spkg/bom v1.0.0 h1:S939THe0ukL5WcTGiGqkgtaW5JW+O6ITaIlpJXTYY64=
github.com/spkg/bom v1.0.0/go.mod h1:lAz2VbTuYNcvs7iaFF8WW0ufXrHShJ7ck1fYFFbVXJs=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
38 changes: 38 additions & 0 deletions planner/core/indexmerge_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,44 @@ import (
"github.com/stretchr/testify/require"
)

func TestAnalyzeMVIndex(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`create table t(a int, b int, c int, j json,
index(a), index(b),
index idx(a, b, (cast(j as signed array)), c),
index idx2(a, b, (cast(j->'$.str' as char(10) array)), c))`)

tk.MustExec("set tidb_analyze_version=2")
tk.MustExec("analyze table t")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows(
"Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx2"))
tk.MustExec("analyze table t index idx")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows(
"Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t",
"Warning 1105 The version 2 would collect all statistics not only the selected indexes",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx2"))

tk.MustExec("set tidb_analyze_version=1")
tk.MustExec("analyze table t")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows(
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx2"))
tk.MustExec("analyze table t index idx")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows(
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx"))
tk.MustExec("analyze table t index a")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows())
tk.MustExec("analyze table t index a, idx, idx2")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows(
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx2"))
}

func TestIndexMergeJSONMemberOf(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
Loading

0 comments on commit 07f0413

Please sign in to comment.