Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#50828
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
guo-shaoge authored and ti-chi-bot committed Feb 20, 2024
1 parent 7a5f72e commit 3b304b7
Showing 1 changed file with 108 additions and 4 deletions.
112 changes: 108 additions & 4 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,29 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
<<<<<<< HEAD:executor/cte.go
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
=======
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/cteutil"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
)

var _ Executor = &CTEExec{}
Expand Down Expand Up @@ -83,6 +99,9 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
if e.producer.isInApply {
e.producer.reset()
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
return err
Expand All @@ -103,7 +122,18 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return e.producer.getChunk(ctx, e, req)
}

func setFirstErr(firstErr error, newErr error, msg string) error {
if newErr != nil {
logutil.BgLogger().Error("cte got error", zap.Any("err", newErr), zap.Any("extra msg", msg))
if firstErr == nil {
firstErr = newErr
}
}
return firstErr
}

// Close implements the Executor interface.
<<<<<<< HEAD:executor/cte.go
func (e *CTEExec) Close() (err error) {
e.producer.resTbl.Lock()
if !e.producer.closed {
Expand All @@ -114,6 +144,31 @@ func (e *CTEExec) Close() (err error) {
return err
}
return e.baseExecutor.Close()
=======
func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.closed {
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
ok := v.(bool)
if ok {
// mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work.
panic(exeerrors.ErrMemoryExceedForQuery)
}
})
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err := e.producer.closeProducer()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}()
err := e.BaseExecutor.Close()
firstErr = setFirstErr(firstErr, err, "close cte children error")
return
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
}

func (e *CTEExec) reset() {
Expand All @@ -123,10 +178,16 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
// Otherwise there may be resource leak because Close() only clean resource for the last Open().
openErr error

ctx sessionctx.Context

seedExec Executor
Expand Down Expand Up @@ -161,15 +222,29 @@ type cteProducer struct {
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
}
if err = p.seedExec.Open(ctx); err != nil {
return err
}

<<<<<<< HEAD:executor/cte.go
p.memTracker = memory.NewTracker(cteExec.id, -1)
p.diskTracker = disk.NewTracker(cteExec.id, -1)
=======
p.resetTracker()
p.memTracker = memory.NewTracker(cteExec.ID(), -1)
p.diskTracker = disk.NewTracker(cteExec.ID(), -1)
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker)
p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker)

Expand Down Expand Up @@ -198,10 +273,10 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
p.hCtx.keyColIdx[i] = i
}
}
p.opened = true
return nil
}

<<<<<<< HEAD:executor/cte.go
func (p *cteProducer) closeProducer() (err error) {
if err = p.seedExec.Close(); err != nil {
return err
Expand All @@ -210,21 +285,38 @@ func (p *cteProducer) closeProducer() (err error) {
if err = p.recursiveExec.Close(); err != nil {
return err
}
=======
func (p *cteProducer) closeProducer() (firstErr error) {
err := exec.Close(p.seedExec)
firstErr = setFirstErr(firstErr, err, "close seedExec err")

if p.recursiveExec != nil {
err = exec.Close(p.recursiveExec)
firstErr = setFirstErr(firstErr, err, "close recursiveExec err")

>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if p.iterOutTbl != nil {
if err = p.iterOutTbl.DerefAndClose(); err != nil {
return err
}
err = p.iterOutTbl.DerefAndClose()
firstErr = setFirstErr(firstErr, err, "deref iterOutTbl err")
}
}
// Reset to nil instead of calling Detach(),
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
<<<<<<< HEAD:executor/cte.go
if p.isInApply {
if err = p.reopenTbls(); err != nil {
return err
}
}
return nil
=======
return
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
}

func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) {
Expand Down Expand Up @@ -467,10 +559,22 @@ func (p *cteProducer) reset() {
p.hashTbl = nil

p.opened = false
p.openErr = nil
p.produced = false
p.closed = false
}

func (p *cteProducer) resetTracker() {
if p.memTracker != nil {
p.memTracker.Reset()
p.memTracker = nil
}
if p.diskTracker != nil {
p.diskTracker.Reset()
p.diskTracker = nil
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
Expand Down

0 comments on commit 3b304b7

Please sign in to comment.