Skip to content

Commit

Permalink
Merge branch 'master' into vecexpr_generate
Browse files Browse the repository at this point in the history
  • Loading branch information
gtygo authored Nov 4, 2019
2 parents f30d80d + f12403e commit 14db3ad
Show file tree
Hide file tree
Showing 29 changed files with 386 additions and 114 deletions.
2 changes: 1 addition & 1 deletion bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *testSuite) TestBindParse(c *C) {
c.Check(bind.UpdateTime, NotNil)

// Test fields with quotes or slashes.
sql = `CREATE GLOBAL BINDING FOR select * from t where i BETWEEN "a" and "b" USING select * from t use index(index_t) where i BETWEEN "a\nb\rc\td\0e" and "x"`
sql = `CREATE GLOBAL BINDING FOR select * from t where i BETWEEN "a" and "b" USING select * from t use index(index_t) where i BETWEEN "a\nb\rc\td\0e" and 'x'`
tk.MustExec(sql)
tk.MustExec(`DROP global binding for select * from t use index(idx) where i BETWEEN "a\nb\rc\td\0e" and "x"`)
}
Expand Down
39 changes: 20 additions & 19 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -430,39 +431,39 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db string) string {
return fmt.Sprintf(
"DELETE FROM mysql.bind_info WHERE original_sql='%s' AND default_db='%s'",
normdOrigSQL,
db,
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s`,
expression.Quote(normdOrigSQL),
expression.Quote(db),
)
}

func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string {
return fmt.Sprintf(`INSERT INTO mysql.bind_info VALUES ('%s', '%s', '%s', '%s', '%s', '%s','%s', '%s')`,
orignalSQL,
info.BindSQL,
db,
info.Status,
info.CreateTime,
info.UpdateTime,
info.Charset,
info.Collation,
return fmt.Sprintf(`INSERT INTO mysql.bind_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s)`,
expression.Quote(orignalSQL),
expression.Quote(info.BindSQL),
expression.Quote(db),
expression.Quote(info.Status),
expression.Quote(info.CreateTime.String()),
expression.Quote(info.UpdateTime.String()),
expression.Quote(info.Charset),
expression.Quote(info.Collation),
)
}

func (h *BindHandle) logicalDeleteBindInfoSQL(record *BindRecord, updateTs types.Time) string {
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status='%s',update_time='%s' WHERE original_sql='%s' and default_db='%s'`,
deleted,
updateTs,
record.OriginalSQL,
record.Db)
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s`,
expression.Quote(deleted),
expression.Quote(updateTs.String()),
expression.Quote(record.OriginalSQL),
expression.Quote(record.Db))
if len(record.Bindings) == 0 {
return sql
}
bindings := make([]string, 0, len(record.Bindings))
for _, bind := range record.Bindings {
bindings = append(bindings, fmt.Sprintf(`'%s'`, bind.BindSQL))
bindings = append(bindings, fmt.Sprintf(`%s`, expression.Quote(bind.BindSQL)))
}
return sql + fmt.Sprintf(" and bind_sql in (%s)", strings.Join(bindings, ","))
return sql + fmt.Sprintf(` and bind_sql in (%s)`, strings.Join(bindings, ","))
}

// GenHintsFromSQL is used to generate hints from SQL.
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key)
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
42 changes: 41 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -535,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...)
if err == nil {
return nil
}
Expand All @@ -546,6 +547,32 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
}

// GetTimestampWithRetry tries to get timestamp using retry and backoff mechanism
func (a *ExecStmt) GetTimestampWithRetry(ctx context.Context) (uint64, error) {
tsoMaxBackoff := 15000
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("ExecStmt.GetTimestampWithRetry", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
bo := tikv.NewBackoffer(ctx, tsoMaxBackoff)
for {
ts, err := a.Ctx.GetStore().GetOracle().GetTimestamp(ctx)
// mock get ts fail
failpoint.Inject("ExecStmtGetTsError", func() (uint64, error) {
return 0, errors.New("ExecStmtGetTsError")
})

if err == nil {
return ts, nil
}
err = bo.Backoff(tikv.BoPDRPC, errors.Errorf("ExecStmt get timestamp failed: %v", err))
if err != nil {
return 0, errors.Trace(err)
}
}
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
txnCtx := a.Ctx.GetSessionVars().TxnCtx
Expand Down Expand Up @@ -573,6 +600,19 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
if conflictCommitTS > forUpdateTS {
newForUpdateTS = conflictCommitTS
}
} else if terror.ErrorEqual(err, tikv.ErrLockAcquireFailAndNoWaitSet) {
// for nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn
// the select for updateTs must be updated, otherwise there maybe rollback problem.
// begin; select for update key1(here ErrLocked or other errors(or max_execution_time like util),
// key1 lock not get and async rollback key1 is raised)
// select for update key1 again(this time lock succ(maybe lock released by others))
// the async rollback operation rollbacked the lock just acquired
newForUpdateTS, tsErr := a.GetTimestampWithRetry(ctx)
if tsErr != nil {
return nil, tsErr
}
txnCtx.SetForUpdateTS(newForUpdateTS)
return nil, err
} else {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, nil, 0, recordKey)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey)
if err != nil {
return result, err
}
Expand Down
16 changes: 12 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
// If there's no handle or it's not a `SELECT FOR UPDATE` statement.
if len(e.tblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate {
if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) {
return nil
}
if req.NumRows() != 0 {
Expand All @@ -815,18 +815,26 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return doLockKeys(ctx, e.ctx, e.keys...)
var lockWaitTime = kv.LockAlwaysWait
if e.Lock == ast.SelectLockForUpdateNoWait {
lockWaitTime = kv.LockNoWait
}
return doLockKeys(ctx, e.ctx, lockWaitTime, e.keys...)
}

func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error {
// doLockKeys is the main entry for pessimistic lock keys
// waitTime means the lock operation will wait in milliseconds if target key is already
// locked by others. used for (select for update nowait) situation
// except 0 means alwaysWait 1 means nowait
func doLockKeys(ctx context.Context, se sessionctx.Context, waitTime int64, keys ...kv.Key) error {
se.GetSessionVars().TxnCtx.ForUpdate = true
// Lock keys only once when finished fetching all results.
txn, err := se.Txn(true)
if err != nil {
return err
}
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...)
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime, keys...)
}

// LimitExec represents limit executor
Expand Down
20 changes: 11 additions & 9 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
type PointGetExecutor struct {
baseExecutor

tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
lockWaitTime int64
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand All @@ -69,6 +70,7 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
e.startTS = startTs
e.done = false
e.lock = p.Lock
e.lockWaitTime = p.LockWaitTime
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -154,7 +156,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
return doLockKeys(ctx, e.ctx, key)
return doLockKeys(ctx, e.ctx, e.lockWaitTime, key)
}
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion expression/builtin_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -2702,6 +2702,11 @@ func (b *builtinQuoteSig) evalString(row chunk.Row) (string, bool, error) {
return "NULL", false, err
}

return Quote(str), false, nil
}

// Quote produce a result that can be used as a properly escaped data value in an SQL statement.
func Quote(str string) string {
runes := []rune(str)
buffer := bytes.NewBufferString("")
buffer.WriteRune('\'')
Expand All @@ -2722,7 +2727,7 @@ func (b *builtinQuoteSig) evalString(row chunk.Row) (string, bool, error) {
}
buffer.WriteRune('\'')

return buffer.String(), false, nil
return buffer.String()
}

type binFunctionClass struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
10 changes: 9 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down Expand Up @@ -374,3 +374,11 @@ type SplitableStore interface {
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}

// Used for pessimistic lock wait time
// these two constants are special for lock protocol with tikv
// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds
var (
LockAlwaysWait = int64(0)
LockNoWait = int64(-1)
)
2 changes: 1 addition & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), nil, 0, Key("lock"))
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
35 changes: 35 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1018,3 +1019,37 @@ func (s *testAnalyzeSuite) TestUpdateProjEliminate(c *C) {
tk.MustExec("create table t(a int, b int)")
tk.MustExec("explain update t t1, (select distinct b from t) t2 set t1.b = t2.b")
}

func (s *testAnalyzeSuite) TestTiFlashCostModel(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
defer func() {
dom.Close()
store.Close()
}()

tk.MustExec("use test")
tk.MustExec("create table t (a int, b int, c int, primary key(a))")
tk.MustExec("insert into t values(1,1,1), (2,2,2), (3,3,3)")

tbl, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t", L: "t"})
c.Assert(err, IsNil)
// Set the hacked TiFlash replica for explain tests.
tbl.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

tk.MustQuery("desc select * from t").Check(testkit.Rows(
"TableReader_7 10000.00 root data:TableScan_6",
"└─TableScan_6 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo",
))
tk.MustQuery("desc select * from t where t.a = 1 or t.a = 2").Check(testkit.Rows(
"TableReader_6 2.00 root data:TableScan_5",
"└─TableScan_5 2.00 cop[tikv] table:t, range:[1,1], [2,2], keep order:false, stats:pseudo",
))
tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'")
tk.MustQuery("desc select * from t where t.a = 1 or t.a = 2").Check(testkit.Rows(
"TableReader_7 2.00 root data:Selection_6",
"└─Selection_6 2.00 cop[tiflash] or(eq(Column#1, 1), eq(Column#1, 2))",
" └─TableScan_5 2.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo",
))
}
8 changes: 6 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,14 +1026,18 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper
Ranges: path.ranges,
AccessCondition: path.accessConds,
filterCondition: path.tableFilters,
StoreType: path.storeType,
}.Init(ds.ctx, ds.blockOffset)
if ds.preferStoreType&preferTiFlash != 0 {
ts.StoreType = kv.TiFlash
}
if ds.preferStoreType&preferTiKV != 0 {
ts.StoreType = kv.TiKV
}
if ts.StoreType == kv.TiFlash {
ts.filterCondition = append(ts.filterCondition, ts.AccessCondition...)
ts.AccessCondition = nil
ts.Ranges = ranger.FullIntRange(false)
} else {
ts.StoreType = kv.TiKV
}
ts.SetSchema(ds.schema)
if ts.Table.PKIsHandle {
Expand Down
Loading

0 comments on commit 14db3ad

Please sign in to comment.