Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#39368
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
XuHuaiyu authored and ti-chi-bot committed Nov 28, 2022
1 parent 5ccc10b commit 575139d
Show file tree
Hide file tree
Showing 9 changed files with 922 additions and 0 deletions.
22 changes: 22 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
p = explain.TargetPlan
}
pi := util.ProcessInfo{
<<<<<<< HEAD
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Expand All @@ -1237,6 +1238,27 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
StatsInfo: plannercore.GetStatsInfo,
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
=======
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: p,
PlanExplainRows: plannercore.GetExplainRowsForPlan(p),
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Info: sql,
CurTxnStartTS: curTxnStartTS,
StmtCtx: s.sessionVars.StmtCtx,
RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx,
MemTracker: s.sessionVars.MemTracker,
DiskTracker: s.sessionVars.DiskTracker,
StatsInfo: plannercore.GetStatsInfo,
OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(),
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368))
}
oldPi := s.ShowProcess()
if p == nil {
Expand Down
76 changes: 76 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,82 @@ type SQLWarn struct {
Err error
}

<<<<<<< HEAD
=======
type jsonSQLWarn struct {
Level string `json:"level"`
SQLErr *terror.Error `json:"err,omitempty"`
Msg string `json:"msg,omitempty"`
}

// MarshalJSON implements the Marshaler.MarshalJSON interface.
func (warn *SQLWarn) MarshalJSON() ([]byte, error) {
w := &jsonSQLWarn{
Level: warn.Level,
}
e := errors.Cause(warn.Err)
switch x := e.(type) {
case *terror.Error:
// Omit outter errors because only the most inner error matters.
w.SQLErr = x
default:
w.Msg = e.Error()
}
return json.Marshal(w)
}

// UnmarshalJSON implements the Unmarshaler.UnmarshalJSON interface.
func (warn *SQLWarn) UnmarshalJSON(data []byte) error {
var w jsonSQLWarn
if err := json.Unmarshal(data, &w); err != nil {
return err
}
warn.Level = w.Level
if w.SQLErr != nil {
warn.Err = w.SQLErr
} else {
warn.Err = errors.New(w.Msg)
}
return nil
}

// ReferenceCount indicates the reference count of StmtCtx.
type ReferenceCount int32

const (
// ReferenceCountIsFrozen indicates the current StmtCtx is resetting, it'll refuse all the access from other sessions.
ReferenceCountIsFrozen int32 = -1
// ReferenceCountNoReference indicates the current StmtCtx is not accessed by other sessions.
ReferenceCountNoReference int32 = 0
)

// TryIncrease tries to increase the reference count.
// There is a small chance that TryIncrease returns true while TryFreeze and
// UnFreeze are invoked successfully during the execution of TryIncrease.
func (rf *ReferenceCount) TryIncrease() bool {
refCnt := atomic.LoadInt32((*int32)(rf))
for ; refCnt != ReferenceCountIsFrozen && !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt+1); refCnt = atomic.LoadInt32((*int32)(rf)) {
}
return refCnt != ReferenceCountIsFrozen
}

// Decrease decreases the reference count.
func (rf *ReferenceCount) Decrease() {
for refCnt := atomic.LoadInt32((*int32)(rf)); !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt-1); refCnt = atomic.LoadInt32((*int32)(rf)) {
}
}

// TryFreeze tries to freeze the StmtCtx to frozen before resetting the old StmtCtx.
func (rf *ReferenceCount) TryFreeze() bool {
return atomic.LoadInt32((*int32)(rf)) == ReferenceCountNoReference && atomic.CompareAndSwapInt32((*int32)(rf), ReferenceCountNoReference, ReferenceCountIsFrozen)
}

// UnFreeze unfreeze the frozen StmtCtx thus the other session can access this StmtCtx.
func (rf *ReferenceCount) UnFreeze() {
atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference)
}

>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368))
// StatementContext contains variables for a statement.
// It should be reset before executing a statement.
type StatementContext struct {
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ type SessionVars struct {
// StmtCtx holds variables for current executing statement.
StmtCtx *stmtctx.StatementContext

// RefCountOfStmtCtx indicates the reference count of StmtCtx. When the
// StmtCtx is accessed by other sessions, e.g. oom-alarm-handler/expensive-query-handler, add one first.
// Note: this variable should be accessed and updated by atomic operations.
RefCountOfStmtCtx stmtctx.ReferenceCount

// AllowAggPushDown can be set to false to forbid aggregation push down.
AllowAggPushDown bool

Expand Down Expand Up @@ -959,9 +964,23 @@ type SessionVars struct {

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext {
<<<<<<< HEAD
s.cached.curr = (s.cached.curr + 1) % 2
s.cached.data[s.cached.curr] = stmtctx.StatementContext{}
return &s.cached.data[s.cached.curr]
=======
sc := &s.cachedStmtCtx[0]
if sc == s.StmtCtx {
sc = &s.cachedStmtCtx[1]
}
if s.RefCountOfStmtCtx.TryFreeze() {
*sc = stmtctx.StatementContext{}
s.RefCountOfStmtCtx.UnFreeze()
} else {
sc = &stmtctx.StatementContext{}
}
return sc
>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368))
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's
Expand Down
9 changes: 9 additions & 0 deletions util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field {
}

// logExpensiveQuery logs the queries which exceed the time threshold or memory threshold.
<<<<<<< HEAD
func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) {
logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...)
=======
func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) {
fields := util.GenLogFields(costTime, info, true)
if fields == nil {
return
}
logutil.BgLogger().Warn(msg, fields...)
>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368))
}
Loading

0 comments on commit 575139d

Please sign in to comment.