Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor,server: re-implement the kill statement by checking the Next() function (10841) #10879

Merged
merged 3 commits into from
Jun 20, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
@@ -98,7 +98,7 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
err := a.executor.Next(ctx, chk)
err := Next(ctx, a.executor, chk)
if err != nil {
a.lastErr = err
return errors.Trace(err)
@@ -286,7 +286,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logAudit()
}()

err = e.Next(ctx, e.newFirstChunk())
err = Next(ctx, e, e.newFirstChunk())
if err != nil {
return nil, errors.Trace(err)
}
6 changes: 3 additions & 3 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
@@ -549,7 +549,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
}
chk = input.chk
}
err = e.children[0].Next(ctx, chk)
err = Next(ctx, e.children[0], chk)
if err != nil {
e.finalOutputCh <- &AfFinalResult{err: errors.Trace(err)}
return
@@ -669,7 +669,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
@@ -856,7 +856,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return errors.Trace(err)
}

err = e.children[0].Next(ctx, e.childResult)
err = Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
4 changes: 2 additions & 2 deletions executor/delete.go
Original file line number Diff line number Diff line change
@@ -100,7 +100,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
for {
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
@@ -177,7 +177,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
chk := e.children[0].newFirstChunk()
for {
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
@@ -754,7 +754,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newFirstChunk()
err = tableReader.Next(ctx, chk)
err = Next(ctx, tableReader, chk)
if err != nil {
logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err))
return errors.Trace(err)
2 changes: 2 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ var (
ErrCantChangeTxCharacteristics = terror.ClassExecutor.New(mysql.ErrCantChangeTxCharacteristics, mysql.MySQLErrName[mysql.ErrCantChangeTxCharacteristics])
ErrPsManyParam = terror.ClassExecutor.New(mysql.ErrPsManyParam, mysql.MySQLErrName[mysql.ErrPsManyParam])
ErrAdminCheckTable = terror.ClassExecutor.New(mysql.ErrAdminCheckTable, mysql.MySQLErrName[mysql.ErrAdminCheckTable])
ErrQueryInterrupted = terror.ClassExecutor.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
)

func init() {
@@ -57,6 +58,7 @@ func init() {
mysql.ErrCantChangeTxCharacteristics: mysql.ErrCantChangeTxCharacteristics,
mysql.ErrPsManyParam: mysql.ErrPsManyParam,
mysql.ErrAdminCheckTable: mysql.ErrAdminCheckTable,
mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted,
}
terror.ErrClassToMySQLCodes[terror.ClassExecutor] = tableMySQLErrCodes
}
31 changes: 23 additions & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
@@ -83,6 +83,10 @@ type baseExecutor struct {
runtimeStats *execdetails.RuntimeStats
}

func (e *baseExecutor) base() *baseExecutor {
return e
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
for _, child := range e.children {
@@ -161,6 +165,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
base() *baseExecutor
Open(context.Context) error
Next(ctx context.Context, chk *chunk.Chunk) error
Close() error
@@ -170,6 +175,16 @@ type Executor interface {
newFirstChunk() *chunk.Chunk
}

// Next is a wrapper function on e.Next(), it handles some common codes.
func Next(ctx context.Context, e Executor, chk *chunk.Chunk) error {
sessVars := e.base().ctx.GetSessionVars()
if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) {
return ErrQueryInterrupted
}

return e.Next(ctx, chk)
}

// CancelDDLJobsExec represents a cancel DDL jobs executor.
type CancelDDLJobsExec struct {
baseExecutor
@@ -533,7 +548,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk = e.src.newFirstChunk()
for {
err := e.src.Next(ctx, chk)
err := Next(ctx, e.src, chk)
if err != nil {
return errors.Trace(err)
}
@@ -636,7 +651,7 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
// Next implements the Executor Next interface.
func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
@@ -693,7 +708,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
for !e.meetFirstBatch {
// transfer req's requiredRows to childResult and then adjust it in childResult
e.childResult = e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.adjustRequiredRows(e.childResult))
err := Next(ctx, e.children[0], e.adjustRequiredRows(e.childResult))
if err != nil {
return errors.Trace(err)
}
@@ -718,7 +733,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
e.cursor += batchSize
}
e.adjustRequiredRows(chk)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
@@ -788,7 +803,7 @@ func init() {
}
chk := exec.newFirstChunk()
for {
err = exec.Next(ctx, chk)
err = Next(ctx, exec, chk)
if err != nil {
return rows, errors.Trace(err)
}
@@ -897,7 +912,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk.AppendRow(e.inputRow)
}
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
@@ -929,7 +944,7 @@ func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) err
return nil
}
}
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
@@ -1069,7 +1084,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}
e.evaluated = true
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
2 changes: 1 addition & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, erro
if e.analyzeExec != nil {
chk := e.analyzeExec.newFirstChunk()
for {
err := e.analyzeExec.Next(ctx, chk)
err := Next(ctx, e.analyzeExec, chk)
if err != nil {
return nil, err
}
4 changes: 2 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
@@ -363,7 +363,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {

task.memTracker.Consume(task.outerResult.MemoryUsage())
for task.outerResult.NumRows() < ow.batchSize {
err := ow.executor.Next(ctx, ow.executorChk)
err := Next(ctx, ow.executor, ow.executorChk)
if err != nil {
return task, errors.Trace(err)
}
@@ -555,7 +555,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
innerResult.GetMemTracker().SetLabel("inner result")
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
err := innerExec.Next(ctx, iw.executorChk)
err := Next(ctx, innerExec, iw.executorChk)
if err != nil {
return errors.Trace(err)
}
2 changes: 1 addition & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
@@ -309,7 +309,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows
batchInsert := (sessVars.BatchInsert && !sessVars.InTxn()) || config.GetGlobalConfig().EnableBatchDML

for {
err := selectExec.Next(ctx, chk)
err := Next(ctx, selectExec, chk)
if err != nil {
return errors.Trace(err)
}
8 changes: 4 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
@@ -203,7 +203,7 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
}
}
outerResult := outerResource.chk
err := e.outerExec.Next(ctx, outerResult)
err := Next(ctx, e.outerExec, outerResult)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: errors.Trace(err),
@@ -268,7 +268,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
return
}
chk := e.children[e.innerIdx].newFirstChunk()
err = e.innerExec.Next(ctx, chk)
err = Next(ctx, e.innerExec, chk)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
@@ -648,7 +648,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
outerIter := chunk.NewIterator4Chunk(e.outerChunk)
for {
if e.outerChunkCursor >= e.outerChunk.NumRows() {
err := e.outerExec.Next(ctx, e.outerChunk)
err := Next(ctx, e.outerExec, e.outerChunk)
if err != nil {
return nil, errors.Trace(err)
}
@@ -685,7 +685,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
e.innerList.Reset()
innerIter := chunk.NewIterator4Chunk(e.innerChunk)
for {
err := e.innerExec.Next(ctx, e.innerChunk)
err := Next(ctx, e.innerExec, e.innerChunk)
if err != nil {
return errors.Trace(err)
}
4 changes: 2 additions & 2 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@ func (t *mergeJoinInnerTable) nextRow() (chunk.Row, error) {
if t.curRow == t.curIter.End() {
t.reallocReaderResult()
oldMemUsage := t.curResult.MemoryUsage()
err := t.reader.Next(t.ctx, t.curResult)
err := Next(t.ctx, t.reader, t.curResult)
// error happens or no more data.
if err != nil || t.curResult.NumRows() == 0 {
t.curRow = t.curIter.End()
@@ -378,7 +378,7 @@ func (e *MergeJoinExec) fetchNextInnerRows() (err error) {
// may not all belong to the same join key, but are guaranteed to be sorted
// according to the join key.
func (e *MergeJoinExec) fetchNextOuterRows(ctx context.Context) (err error) {
err = e.outerTable.reader.Next(ctx, e.outerTable.chk)
err = Next(ctx, e.outerTable.reader, e.outerTable.chk)
if err != nil {
return errors.Trace(err)
}
46 changes: 13 additions & 33 deletions executor/point_get.go
Original file line number Diff line number Diff line change
@@ -18,10 +18,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
@@ -38,22 +36,23 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
b.err = errors.Trace(err)
return nil
}
return &PointGetExecutor{
ctx: b.ctx,
schema: p.Schema(),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
}
e := &PointGetExecutor{
baseExecutor: newBaseExecutor(b.ctx, p.Schema(), p.ExplainID()),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
}
e.base().initCap = 1
e.base().maxChunkSize = 1
return e
}

// PointGetExecutor executes point select query.
type PointGetExecutor struct {
ctx sessionctx.Context
schema *expression.Schema
tps []*types.FieldType
baseExecutor

tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
@@ -232,22 +231,3 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
}
return nil
}

// Schema implements the Executor interface.
func (e *PointGetExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *PointGetExecutor) retTypes() []*types.FieldType {
if e.tps == nil {
e.tps = make([]*types.FieldType, e.schema.Len())
for i := range e.schema.Columns {
e.tps[i] = e.schema.Columns[i].RetType
}
}
return e.tps
}

func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk {
return chunk.New(e.retTypes(), 1, 1)
}
4 changes: 2 additions & 2 deletions executor/projection.go
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ func (e *ProjectionExec) isUnparallelExec() bool {
func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk) error {
// push requiredRows down
e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
@@ -306,7 +306,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {

requiredRows := atomic.LoadInt64(&f.proj.parentReqRows)
input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize)
err := f.child.Next(ctx, input.chk)
err := Next(ctx, f.child, input.chk)
if err != nil || input.chk.NumRows() == 0 {
output.done <- errors.Trace(err)
return
Loading