Skip to content

Commit

Permalink
Merge branch 'master' into unstable-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jan 25, 2022
2 parents 9e58dc6 + 373f041 commit c485abc
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 72 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ devgotest: failpoint-enable
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -check.p true > gotest.log || { $(FAILPOINT_DISABLE); grep -v '^\([[]20\|PASS:\|ok \)' 'gotest.log'; exit 1; }
@$(FAILPOINT_DISABLE)

ut: failpoint-enable tools/bin/ut
ut: failpoint-enable tools/bin/ut tools/bin/xprog
tools/bin/ut $(X);
@$(FAILPOINT_DISABLE)

Expand Down Expand Up @@ -216,6 +216,10 @@ tools/bin/ut: tools/check/ut.go
cd tools/check; \
$(GO) build -o ../bin/ut ut.go

tools/bin/xprog: tools/check/xprog.go
cd tools/check; \
$(GO) build -o ../bin/xprog xprog.go

tools/bin/megacheck: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/megacheck honnef.co/go/tools/cmd/megacheck
Expand Down
12 changes: 5 additions & 7 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,6 @@ func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *Req
return builder
}

// SetIsolationLevel sets "IsolationLevel" for "kv.Request".
func (builder *RequestBuilder) SetIsolationLevel(level kv.IsoLevel) *RequestBuilder {
builder.Request.IsolationLevel = level
return builder
}

const estimatedRegionRowCount = 100000

// SetDAGRequest sets the request type to "ReqTypeDAG" and construct request data.
Expand Down Expand Up @@ -250,7 +244,11 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
// Concurrency may be set to 1 by SetDAGRequest
builder.Request.Concurrency = sv.DistSQLScanConcurrency()
}
builder.Request.IsolationLevel = builder.getIsolationLevel()
if sv.StmtCtx.WeakConsistency {
builder.Request.IsolationLevel = kv.RC
} else {
builder.Request.IsolationLevel = builder.getIsolationLevel()
}
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
Expand Down
3 changes: 0 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3945,9 +3945,6 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
if err != nil {
return nil, err
}
if builder.ctx.GetSessionVars().StmtCtx.WeakConsistency {
reqBuilderWithRange.SetIsolationLevel(kv.RC)
}
kvReq, err := reqBuilderWithRange.
SetDAGRequest(e.dagPB).
SetStartTS(startTS).
Expand Down
6 changes: 0 additions & 6 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,6 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down Expand Up @@ -559,9 +556,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
PushedLimit: e.PushedLimit,
}
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
Expand Down
3 changes: 0 additions & 3 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
}

var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetDAGRequest(e.dagPBs[workID]).
SetStartTS(e.startTS).
SetDesc(e.descs[workID]).
Expand Down
6 changes: 0 additions & 6 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
return nil, err
}
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
reqBuilder := builder.SetKeyRanges(kvRange)
kvReq, err := reqBuilder.
SetDAGRequest(e.dagPB).
Expand Down Expand Up @@ -357,9 +354,6 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
} else {
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
reqBuilder.SetIsolationLevel(kv.RC)
}
reqBuilder.
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down
61 changes: 29 additions & 32 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package stmtctx_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -96,49 +98,44 @@ func TestWeakConsistencyRead(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

lastWeakConsistency := func(tk *testkit.TestKit) bool {
return tk.Session().GetSessionVars().StmtCtx.WeakConsistency
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, c int, c1 int, unique index i(c))")

execAndCheck := func(sql string, rows [][]interface{}, isolationLevel kv.IsoLevel) {
ctx := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) {
require.Equal(t, req.IsolationLevel, isolationLevel)
})
tk.Session().Execute(ctx, sql)
if rows != nil {
tk.MustQuery(sql).Check(rows)
}
lastWeakConsistency := tk.Session().GetSessionVars().StmtCtx.WeakConsistency
require.Equal(t, lastWeakConsistency, isolationLevel == kv.RC)
}

// strict
tk.MustExec("insert into t values(1, 1, 1)")
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
require.False(t, lastWeakConsistency(tk))
execAndCheck("insert into t values(1, 1, 1)", nil, kv.SI)
execAndCheck("select * from t", testkit.Rows("1 1 1"), kv.SI)
tk.MustExec("prepare s from 'select * from t'")
tk.MustExec("prepare u from 'update t set c1 = id + 1'")
tk.MustQuery("execute s").Check(testkit.Rows("1 1 1"))
require.False(t, lastWeakConsistency(tk))
tk.MustExec("execute u")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("admin check table t")
require.False(t, lastWeakConsistency(tk))
execAndCheck("execute s", testkit.Rows("1 1 1"), kv.SI)
execAndCheck("execute u", nil, kv.SI)
execAndCheck("admin check table t", nil, kv.SI)
// weak
tk.MustExec("set tidb_read_consistency = weak")
tk.MustExec("insert into t values(2, 2, 2)")
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2", "2 2 2"))
require.True(t, lastWeakConsistency(tk))
tk.MustQuery("execute s").Check(testkit.Rows("1 1 2", "2 2 2"))
require.True(t, lastWeakConsistency(tk))
tk.MustExec("execute u")
require.False(t, lastWeakConsistency(tk))
execAndCheck("insert into t values(2, 2, 2)", nil, kv.SI)
execAndCheck("select * from t", testkit.Rows("1 1 2", "2 2 2"), kv.RC)
execAndCheck("execute s", testkit.Rows("1 1 2", "2 2 2"), kv.RC)
execAndCheck("execute u", nil, kv.SI)
// non-read-only queries should be strict
tk.MustExec("admin check table t")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("update t set c = c + 1 where id = 2")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("delete from t where id = 2")
require.False(t, lastWeakConsistency(tk))
execAndCheck("admin check table t", nil, kv.SI)
execAndCheck("update t set c = c + 1 where id = 2", nil, kv.SI)
execAndCheck("delete from t where id = 2", nil, kv.SI)
// in-transaction queries should be strict
tk.MustExec("begin")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2"))
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("execute s").Check(testkit.Rows("1 1 2"))
require.False(t, lastWeakConsistency(tk))
execAndCheck("select * from t", testkit.Rows("1 1 2"), kv.SI)
execAndCheck("execute s", testkit.Rows("1 1 2"), kv.SI)
tk.MustExec("rollback")
}
46 changes: 32 additions & 14 deletions tools/check/ut.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,10 @@ func cmdBuild(args ...string) bool {

// build all packages
if len(args) == 0 {
for _, pkg := range pkgs {
err := buildTestBinary(pkg)
if err != nil {
fmt.Println("build package error", pkg, err)
return false
}
err := buildTestBinaryMulti(pkgs)
if err != nil {
fmt.Println("build package error", pkgs, err)
return false
}
return true
}
Expand All @@ -163,16 +161,16 @@ func cmdRun(args ...string) bool {
return false
}
tasks := make([]task, 0, 5000)
start := time.Now()
// run all tests
if len(args) == 0 {
for _, pkg := range pkgs {
fmt.Println("handling package", pkg)
err := buildTestBinary(pkg)
if err != nil {
fmt.Println("build package error", pkg, err)
return false
}
err := buildTestBinaryMulti(pkgs)
if err != nil {
fmt.Println("build package error", pkgs, err)
return false
}

for _, pkg := range pkgs {
exist, err := testBinaryExist(pkg)
if err != nil {
fmt.Println("check test binary existance error", err)
Expand Down Expand Up @@ -248,7 +246,7 @@ func cmdRun(args ...string) bool {
}
tasks = tmp
}
fmt.Println("building task finish...", len(tasks))
fmt.Printf("building task finish, count=%d, takes=%v\n", len(tasks), time.Since(start))

numactl := numactlExist()
taskCh := make(chan task, 100)
Expand Down Expand Up @@ -446,7 +444,27 @@ func buildTestBinary(pkg string) error {
return withTrace(err)
}
return nil
}

// buildTestBinaryMulti is much faster than build the test packages one by one.
func buildTestBinaryMulti(pkgs []string) error {
// go test --exec=xprog -cover -vet=off --count=0 $(pkgs)
xprogPath := path.Join(workDir, "tools/bin/xprog")
packages := make([]string, 0, len(pkgs))
for _, pkg := range pkgs {
packages = append(packages, path.Join(modulePath, pkg))
}

var cmd *exec.Cmd
cmd = exec.Command("go", "test", "--exec", xprogPath, "-vet", "off", "-count", "0")
cmd.Args = append(cmd.Args, packages...)
cmd.Dir = workDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return withTrace(err)
}
return nil
}

func testBinaryExist(pkg string) (bool, error) {
Expand Down
118 changes: 118 additions & 0 deletions tools/check/xprog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"strings"
)

func main() {
// See https://github.com/golang/go/issues/15513#issuecomment-773994959
// go test --exec=xprog ./...
// Command line args looks like:
// '$CWD/xprog /tmp/go-build2662369829/b1382/aggfuncs.test -test.paniconexit0 -test.timeout=10m0s'
// This program moves the test binary /tmp/go-build2662369829/b1382/aggfuncs.test to someplace else for later use.

// Extract the current work directory
cwd := os.Args[0]
cwd = cwd[:len(cwd)-len("tools/bin/xprog")]

testBinaryPath := os.Args[1]
dir, _ := filepath.Split(testBinaryPath)

// Extract the package info from /tmp/go-build2662369829/b1382/importcfg.link
pkg := getPackageInfo(dir)

const prefix = "github.com/pingcap/tidb/"
if !strings.HasPrefix(pkg, prefix) {
os.Exit(-3)
}

// github.com/pingcap/tidb/util/topsql.test => util/topsql
pkg = pkg[len(prefix) : len(pkg)-len(".test")]

_, file := filepath.Split(pkg)

// The path of the destination file looks like $CWD/util/topsql/topsql.test.bin
newName := filepath.Join(cwd, pkg, file+".test.bin")

if err1 := os.Rename(testBinaryPath, newName); err1 != nil {
// Rename fail, handle error like "invalid cross-device linkcd tools/check"
err1 = MoveFile(testBinaryPath, newName)
if err1 != nil {
os.Exit(-4)
}
}
}

func getPackageInfo(dir string) string {
// Read the /tmp/go-build2662369829/b1382/importcfg.link file to get the package information
f, err := os.Open(filepath.Join(dir, "importcfg.link"))
if err != nil {
os.Exit(-1)
}
defer f.Close()

r := bufio.NewReader(f)
// packagefile github.com/pingcap/tidb/session.test=/home/genius/.cache/go-build/fb/fb1587cce5727fa9461131eab8260a52878da04f5c8da49dd3c7b2d941430c63-d
line, _, err := r.ReadLine()
if err != nil {
os.Exit(-2)
}
start := strings.IndexByte(string(line), ' ')
end := strings.IndexByte(string(line), '=')
pkg := string(line[start+1 : end])
return pkg
}

func MoveFile(sourcePath, destPath string) error {
inputFile, err := os.Open(sourcePath)
if err != nil {
return fmt.Errorf("Couldn't open source file: %s", err)
}
outputFile, err := os.Create(destPath)
if err != nil {
inputFile.Close()
return fmt.Errorf("Couldn't open dest file: %s", err)
}
defer outputFile.Close()
_, err = io.Copy(outputFile, inputFile)
inputFile.Close()
if err != nil {
return fmt.Errorf("Writing to output file failed: %s", err)
}

// Handle the permissions
si, err := os.Stat(sourcePath)
if err != nil {
return fmt.Errorf("Stat error: %s", err)
}
err = os.Chmod(destPath, si.Mode())
if err != nil {
return fmt.Errorf("Chmod error: %s", err)
}

// The copy was successful, so now delete the original file
err = os.Remove(sourcePath)
if err != nil {
return fmt.Errorf("Failed removing original file: %s", err)
}
return nil
}

0 comments on commit c485abc

Please sign in to comment.