From 575139db5cfc64f6613db84a4a01c83d5b2ff240 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Mon, 28 Nov 2022 17:02:01 +0800 Subject: [PATCH] This is an automated cherry-pick of #39368 Signed-off-by: ti-chi-bot --- session/session.go | 22 + sessionctx/stmtctx/stmtctx.go | 76 ++++ sessionctx/variable/session.go | 19 + util/expensivequery/expensivequery.go | 9 + util/memoryusagealarm/memoryusagealarm.go | 380 ++++++++++++++++++ .../memoryusagealarm/memoryusagealarm_test.go | 145 +++++++ util/processinfo.go | 21 + util/util.go | 176 ++++++++ util/util_test.go | 74 ++++ 9 files changed, 922 insertions(+) create mode 100644 util/memoryusagealarm/memoryusagealarm.go create mode 100644 util/memoryusagealarm/memoryusagealarm_test.go create mode 100644 util/util.go create mode 100644 util/util_test.go diff --git a/session/session.go b/session/session.go index 9084ee7c98b06..0ffd92473cf3e 100644 --- a/session/session.go +++ b/session/session.go @@ -1222,6 +1222,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu p = explain.TargetPlan } pi := util.ProcessInfo{ +<<<<<<< HEAD ID: s.sessionVars.ConnectionID, Port: s.sessionVars.Port, DB: s.sessionVars.CurrentDB, @@ -1237,6 +1238,27 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu StatsInfo: plannercore.GetStatsInfo, MaxExecutionTime: maxExecutionTime, RedactSQL: s.sessionVars.EnableRedactLog, +======= + ID: s.sessionVars.ConnectionID, + Port: s.sessionVars.Port, + DB: s.sessionVars.CurrentDB, + Command: command, + Plan: p, + PlanExplainRows: plannercore.GetExplainRowsForPlan(p), + RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, + Time: t, + State: s.Status(), + Info: sql, + CurTxnStartTS: curTxnStartTS, + StmtCtx: s.sessionVars.StmtCtx, + RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx, + MemTracker: s.sessionVars.MemTracker, + DiskTracker: s.sessionVars.DiskTracker, + StatsInfo: plannercore.GetStatsInfo, + OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(), + MaxExecutionTime: maxExecutionTime, + RedactSQL: s.sessionVars.EnableRedactLog, +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) } oldPi := s.ShowProcess() if p == nil { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 85ea1bfa6b373..0cb460bfccf77 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -56,6 +56,82 @@ type SQLWarn struct { Err error } +<<<<<<< HEAD +======= +type jsonSQLWarn struct { + Level string `json:"level"` + SQLErr *terror.Error `json:"err,omitempty"` + Msg string `json:"msg,omitempty"` +} + +// MarshalJSON implements the Marshaler.MarshalJSON interface. +func (warn *SQLWarn) MarshalJSON() ([]byte, error) { + w := &jsonSQLWarn{ + Level: warn.Level, + } + e := errors.Cause(warn.Err) + switch x := e.(type) { + case *terror.Error: + // Omit outter errors because only the most inner error matters. + w.SQLErr = x + default: + w.Msg = e.Error() + } + return json.Marshal(w) +} + +// UnmarshalJSON implements the Unmarshaler.UnmarshalJSON interface. +func (warn *SQLWarn) UnmarshalJSON(data []byte) error { + var w jsonSQLWarn + if err := json.Unmarshal(data, &w); err != nil { + return err + } + warn.Level = w.Level + if w.SQLErr != nil { + warn.Err = w.SQLErr + } else { + warn.Err = errors.New(w.Msg) + } + return nil +} + +// ReferenceCount indicates the reference count of StmtCtx. +type ReferenceCount int32 + +const ( + // ReferenceCountIsFrozen indicates the current StmtCtx is resetting, it'll refuse all the access from other sessions. + ReferenceCountIsFrozen int32 = -1 + // ReferenceCountNoReference indicates the current StmtCtx is not accessed by other sessions. + ReferenceCountNoReference int32 = 0 +) + +// TryIncrease tries to increase the reference count. +// There is a small chance that TryIncrease returns true while TryFreeze and +// UnFreeze are invoked successfully during the execution of TryIncrease. +func (rf *ReferenceCount) TryIncrease() bool { + refCnt := atomic.LoadInt32((*int32)(rf)) + for ; refCnt != ReferenceCountIsFrozen && !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt+1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } + return refCnt != ReferenceCountIsFrozen +} + +// Decrease decreases the reference count. +func (rf *ReferenceCount) Decrease() { + for refCnt := atomic.LoadInt32((*int32)(rf)); !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt-1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } +} + +// TryFreeze tries to freeze the StmtCtx to frozen before resetting the old StmtCtx. +func (rf *ReferenceCount) TryFreeze() bool { + return atomic.LoadInt32((*int32)(rf)) == ReferenceCountNoReference && atomic.CompareAndSwapInt32((*int32)(rf), ReferenceCountNoReference, ReferenceCountIsFrozen) +} + +// UnFreeze unfreeze the frozen StmtCtx thus the other session can access this StmtCtx. +func (rf *ReferenceCount) UnFreeze() { + atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference) +} + +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) // StatementContext contains variables for a statement. // It should be reset before executing a statement. type StatementContext struct { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1c46e53cb1878..8a720aa4cc548 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -547,6 +547,11 @@ type SessionVars struct { // StmtCtx holds variables for current executing statement. StmtCtx *stmtctx.StatementContext + // RefCountOfStmtCtx indicates the reference count of StmtCtx. When the + // StmtCtx is accessed by other sessions, e.g. oom-alarm-handler/expensive-query-handler, add one first. + // Note: this variable should be accessed and updated by atomic operations. + RefCountOfStmtCtx stmtctx.ReferenceCount + // AllowAggPushDown can be set to false to forbid aggregation push down. AllowAggPushDown bool @@ -959,9 +964,23 @@ type SessionVars struct { // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { +<<<<<<< HEAD s.cached.curr = (s.cached.curr + 1) % 2 s.cached.data[s.cached.curr] = stmtctx.StatementContext{} return &s.cached.data[s.cached.curr] +======= + sc := &s.cachedStmtCtx[0] + if sc == s.StmtCtx { + sc = &s.cachedStmtCtx[1] + } + if s.RefCountOfStmtCtx.TryFreeze() { + *sc = stmtctx.StatementContext{} + s.RefCountOfStmtCtx.UnFreeze() + } else { + sc = &stmtctx.StatementContext{} + } + return sc +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) } // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index 3657c60a69797..1cabdc32fcd5c 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -175,6 +175,15 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { } // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. +<<<<<<< HEAD func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) { logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...) +======= +func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) { + fields := util.GenLogFields(costTime, info, true) + if fields == nil { + return + } + logutil.BgLogger().Warn(msg, fields...) +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) } diff --git a/util/memoryusagealarm/memoryusagealarm.go b/util/memoryusagealarm/memoryusagealarm.go new file mode 100644 index 0000000000000..c8a6fd0eaecda --- /dev/null +++ b/util/memoryusagealarm/memoryusagealarm.go @@ -0,0 +1,380 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memoryusagealarm + +import ( + "fmt" + "os" + "path/filepath" + rpprof "runtime/pprof" + "strings" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "golang.org/x/exp/slices" +) + +// Handle is the handler for expensive query. +type Handle struct { + exitCh chan struct{} + sm atomic.Value +} + +// NewMemoryUsageAlarmHandle builds a memory usage alarm handler. +func NewMemoryUsageAlarmHandle(exitCh chan struct{}) *Handle { + return &Handle{exitCh: exitCh} +} + +// SetSessionManager sets the SessionManager which is used to fetching the info +// of all active sessions. +func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle { + eqh.sm.Store(sm) + return eqh +} + +// Run starts a memory usage alarm goroutine at the start time of the server. +func (eqh *Handle) Run() { + // use 100ms as tickInterval temply, may use given interval or use defined variable later + tickInterval := time.Millisecond * time.Duration(100) + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() + sm := eqh.sm.Load().(util.SessionManager) + record := &memoryUsageAlarm{} + for { + select { + case <-ticker.C: + record.alarm4ExcessiveMemUsage(sm) + case <-eqh.exitCh: + return + } + } +} + +type memoryUsageAlarm struct { + lastCheckTime time.Time + lastUpdateVariableTime time.Time + err error + baseRecordDir string + lastRecordDirName []string + lastRecordMemUsed uint64 + memoryUsageAlarmRatio float64 + memoryUsageAlarmKeepRecordNum int64 + serverMemoryLimit uint64 + isServerMemoryLimitSet bool + initialized bool +} + +func (record *memoryUsageAlarm) updateVariable() { + if time.Since(record.lastUpdateVariableTime) < 60*time.Second { + return + } + record.memoryUsageAlarmRatio = variable.MemoryUsageAlarmRatio.Load() + record.memoryUsageAlarmKeepRecordNum = variable.MemoryUsageAlarmKeepRecordNum.Load() + record.serverMemoryLimit = memory.ServerMemoryLimit.Load() + if record.serverMemoryLimit != 0 { + record.isServerMemoryLimitSet = true + } else { + record.serverMemoryLimit, record.err = memory.MemTotal() + if record.err != nil { + logutil.BgLogger().Error("get system total memory fail", zap.Error(record.err)) + return + } + record.isServerMemoryLimitSet = false + } + record.lastUpdateVariableTime = time.Now() +} + +func (record *memoryUsageAlarm) initMemoryUsageAlarmRecord() { + record.lastCheckTime = time.Time{} + record.lastUpdateVariableTime = time.Time{} + record.updateVariable() + tidbLogDir, _ := filepath.Split(config.GetGlobalConfig().Log.File.Filename) + record.baseRecordDir = filepath.Join(tidbLogDir, "oom_record") + if record.err = disk.CheckAndCreateDir(record.baseRecordDir); record.err != nil { + return + } + // Read last records + recordDirs, err := os.ReadDir(record.baseRecordDir) + if err != nil { + record.err = err + return + } + for _, dir := range recordDirs { + name := filepath.Join(record.baseRecordDir, dir.Name()) + if strings.Contains(dir.Name(), "record") { + record.lastRecordDirName = append(record.lastRecordDirName, name) + } + } + record.initialized = true +} + +// If Performance.ServerMemoryQuota is set, use `ServerMemoryQuota * MemoryUsageAlarmRatio` to check oom risk. +// If Performance.ServerMemoryQuota is not set, use `system total memory size * MemoryUsageAlarmRatio` to check oom risk. +func (record *memoryUsageAlarm) alarm4ExcessiveMemUsage(sm util.SessionManager) { + if !record.initialized { + record.initMemoryUsageAlarmRecord() + if record.err != nil { + return + } + } else { + record.updateVariable() + } + if record.memoryUsageAlarmRatio <= 0.0 || record.memoryUsageAlarmRatio >= 1.0 { + return + } + var memoryUsage uint64 + instanceStats := memory.ReadMemStats() + if record.isServerMemoryLimitSet { + memoryUsage = instanceStats.HeapAlloc + } else { + memoryUsage, record.err = memory.MemUsed() + if record.err != nil { + logutil.BgLogger().Error("get system memory usage fail", zap.Error(record.err)) + return + } + } + + // TODO: Consider NextGC to record SQLs. + if needRecord, reason := record.needRecord(memoryUsage); needRecord { + record.lastCheckTime = time.Now() + record.lastRecordMemUsed = memoryUsage + record.doRecord(memoryUsage, instanceStats.HeapAlloc, sm, reason) + record.tryRemoveRedundantRecords() + } +} + +// AlarmReason implements alarm reason. +type AlarmReason uint + +const ( + // GrowTooFast is the reason that memory increasing too fast. + GrowTooFast AlarmReason = iota + // ExceedAlarmRatio is the reason that memory used exceed threshold. + ExceedAlarmRatio + // NoReason means no alarm + NoReason +) + +func (reason AlarmReason) String() string { + return [...]string{"memory usage grows too fast", "memory usage exceeds alarm ratio", "no reason"}[reason] +} + +func (record *memoryUsageAlarm) needRecord(memoryUsage uint64) (bool, AlarmReason) { + // At least 60 seconds between two recordings that memory usage is less than threshold (default 70% system memory). + // If the memory is still exceeded, only records once. + // If the memory used ratio recorded this time is 0.1 higher than last time, we will force record this time. + if float64(memoryUsage) <= float64(record.serverMemoryLimit)*record.memoryUsageAlarmRatio { + return false, NoReason + } + + interval := time.Since(record.lastCheckTime) + memDiff := int64(memoryUsage) - int64(record.lastRecordMemUsed) + if interval > 60*time.Second { + return true, ExceedAlarmRatio + } + if float64(memDiff) > 0.1*float64(record.serverMemoryLimit) { + return true, GrowTooFast + } + return false, NoReason +} + +func (record *memoryUsageAlarm) doRecord(memUsage uint64, instanceMemoryUsage uint64, sm util.SessionManager, alarmReason AlarmReason) { + fields := make([]zap.Field, 0, 6) + fields = append(fields, zap.Bool("is tidb_server_memory_limit set", record.isServerMemoryLimitSet)) + if record.isServerMemoryLimitSet { + fields = append(fields, zap.Any("tidb_server_memory_limit", record.serverMemoryLimit)) + fields = append(fields, zap.Any("tidb-server memory usage", memUsage)) + } else { + fields = append(fields, zap.Any("system memory total", record.serverMemoryLimit)) + fields = append(fields, zap.Any("system memory usage", memUsage)) + fields = append(fields, zap.Any("tidb-server memory usage", instanceMemoryUsage)) + } + fields = append(fields, zap.Any("memory-usage-alarm-ratio", record.memoryUsageAlarmRatio)) + fields = append(fields, zap.Any("record path", record.baseRecordDir)) + logutil.BgLogger().Warn(fmt.Sprintf("tidb-server has the risk of OOM because of %s. Running SQLs and heap profile will be recorded in record path", alarmReason.String()), fields...) + recordDir := filepath.Join(record.baseRecordDir, "record"+record.lastCheckTime.Format(time.RFC3339)) + if record.err = disk.CheckAndCreateDir(recordDir); record.err != nil { + return + } + record.lastRecordDirName = append(record.lastRecordDirName, recordDir) + if record.err = record.recordSQL(sm, recordDir); record.err != nil { + return + } + if record.err = record.recordProfile(recordDir); record.err != nil { + return + } +} + +func (record *memoryUsageAlarm) tryRemoveRedundantRecords() { + filename := &record.lastRecordDirName + for len(*filename) > int(record.memoryUsageAlarmKeepRecordNum) { + err := os.RemoveAll((*filename)[0]) + if err != nil { + logutil.BgLogger().Error("remove temp files failed", zap.Error(err)) + } + *filename = (*filename)[1:] + } +} + +func getPlanString(info *util.ProcessInfo) string { + var buf strings.Builder + rows := info.PlanExplainRows + buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|", "id", "estRows", "task", "access object", "operator info")) + for _, row := range rows { + buf.WriteString(fmt.Sprintf("\n|%v|%v|%v|%v|%v|", row[0], row[1], row[2], row[3], row[4])) + } + return buf.String() +} + +func (record *memoryUsageAlarm) printTop10SqlInfo(pinfo []*util.ProcessInfo, f *os.File) { + if _, err := f.WriteString("The 10 SQLs with the most memory usage for OOM analysis\n"); err != nil { + logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err)) + } + memBuf := record.getTop10SqlInfoByMemoryUsage(pinfo) + if _, err := f.WriteString(memBuf.String()); err != nil { + logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err)) + } + if _, err := f.WriteString("The 10 SQLs with the most time usage for OOM analysis\n"); err != nil { + logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err)) + } + costBuf := record.getTop10SqlInfoByCostTime(pinfo) + if _, err := f.WriteString(costBuf.String()); err != nil { + logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err)) + } +} + +func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo) bool, pinfo []*util.ProcessInfo) strings.Builder { + slices.SortFunc(pinfo, cmp) + list := pinfo + var buf strings.Builder + oomAction := variable.OOMAction.Load() + serverMemoryLimit := memory.ServerMemoryLimit.Load() + for i, totalCnt := 0, 10; i < len(list) && totalCnt > 0; i++ { + info := list[i] + buf.WriteString(fmt.Sprintf("SQL %v: \n", i)) + fields := util.GenLogFields(record.lastCheckTime.Sub(info.Time), info, false) + if fields == nil { + continue + } + fields = append(fields, zap.String("tidb_mem_oom_action", oomAction)) + fields = append(fields, zap.Uint64("tidb_server_memory_limit", serverMemoryLimit)) + fields = append(fields, zap.Int64("tidb_mem_quota_query", info.OOMAlarmVariablesInfo.SessionMemQuotaQuery)) + fields = append(fields, zap.Int("tidb_analyze_version", info.OOMAlarmVariablesInfo.SessionAnalyzeVersion)) + fields = append(fields, zap.Bool("tidb_enable_rate_limit_action", info.OOMAlarmVariablesInfo.SessionEnabledRateLimitAction)) + fields = append(fields, zap.String("current_analyze_plan", getPlanString(info))) + for _, field := range fields { + switch field.Type { + case zapcore.StringType: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.String)) + case zapcore.Uint8Type, zapcore.Uint16Type, zapcore.Uint32Type, zapcore.Uint64Type: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, uint64(field.Integer))) + case zapcore.Int8Type, zapcore.Int16Type, zapcore.Int32Type, zapcore.Int64Type: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer)) + case zapcore.BoolType: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer == 1)) + } + buf.WriteString("\n") + } + totalCnt-- + } + buf.WriteString("\n") + return buf +} + +func (record *memoryUsageAlarm) getTop10SqlInfoByMemoryUsage(pinfo []*util.ProcessInfo) strings.Builder { + return record.getTop10SqlInfo(func(i, j *util.ProcessInfo) bool { + return i.MemTracker.MaxConsumed() > j.MemTracker.MaxConsumed() + }, pinfo) +} + +func (record *memoryUsageAlarm) getTop10SqlInfoByCostTime(pinfo []*util.ProcessInfo) strings.Builder { + return record.getTop10SqlInfo(func(i, j *util.ProcessInfo) bool { + return i.Time.Before(j.Time) + }, pinfo) +} + +func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager, recordDir string) error { + processInfo := sm.ShowProcessList() + pinfo := make([]*util.ProcessInfo, 0, len(processInfo)) + for _, info := range processInfo { + if len(info.Info) != 0 { + pinfo = append(pinfo, info) + } + } + fileName := filepath.Join(recordDir, "running_sql") + f, err := os.Create(fileName) + if err != nil { + logutil.BgLogger().Error("create oom record file fail", zap.Error(err)) + return err + } + defer func() { + err := f.Close() + if err != nil { + logutil.BgLogger().Error("close oom record file fail", zap.Error(err)) + } + }() + record.printTop10SqlInfo(pinfo, f) + return nil +} + +type item struct { + Name string + Debug int +} + +func (*memoryUsageAlarm) recordProfile(recordDir string) error { + items := []item{ + {Name: "heap"}, + {Name: "goroutine", Debug: 2}, + } + for _, item := range items { + if err := write(item, recordDir); err != nil { + return err + } + } + return nil +} + +func write(item item, recordDir string) error { + fileName := filepath.Join(recordDir, item.Name) + f, err := os.Create(fileName) + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("create %v profile file fail", item.Name), zap.Error(err)) + return err + } + p := rpprof.Lookup(item.Name) + err = p.WriteTo(f, item.Debug) + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("write %v profile file fail", item.Name), zap.Error(err)) + return err + } + + //nolint: revive + defer func() { + err := f.Close() + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("close %v profile file fail", item.Name), zap.Error(err)) + } + }() + return nil +} diff --git a/util/memoryusagealarm/memoryusagealarm_test.go b/util/memoryusagealarm/memoryusagealarm_test.go new file mode 100644 index 0000000000000..6e5147805676f --- /dev/null +++ b/util/memoryusagealarm/memoryusagealarm_test.go @@ -0,0 +1,145 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memoryusagealarm + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/memory" + "github.com/stretchr/testify/assert" +) + +func TestIfNeedDoRecord(t *testing.T) { + record := memoryUsageAlarm{} + record.initMemoryUsageAlarmRecord() + + // mem usage ratio < 70% will not be recorded + memUsed := 0.69 * float64(record.serverMemoryLimit) + needRecord, reason := record.needRecord(uint64(memUsed)) + assert.False(t, needRecord) + assert.Equal(t, NoReason, reason) + + // mem usage ratio > 70% will not be recorded + memUsed = 0.71 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.True(t, needRecord) + assert.Equal(t, ExceedAlarmRatio, reason) + record.lastCheckTime = time.Now() + record.lastRecordMemUsed = uint64(memUsed) + + // check time - last record time < 60s will not be recorded + memUsed = 0.71 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.False(t, needRecord) + assert.Equal(t, NoReason, reason) + + // check time - last record time > 60s will be recorded + record.lastCheckTime = record.lastCheckTime.Add(-60 * time.Second) + memUsed = 0.71 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.True(t, needRecord) + assert.Equal(t, ExceedAlarmRatio, reason) + record.lastCheckTime = time.Now() + record.lastRecordMemUsed = uint64(memUsed) + + // mem usage ratio - last mem usage ratio < 10% will not be recorded + memUsed = 0.80 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.False(t, needRecord) + assert.Equal(t, NoReason, reason) + + // mem usage ratio - last mem usage ratio > 10% will not be recorded even though check time - last record time + memUsed = 0.82 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.True(t, needRecord) + assert.Equal(t, GrowTooFast, reason) +} + +func genTime(sec int64) time.Time { + minStartTime := time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).Unix() + return time.Unix(minStartTime+sec, 0) +} + +func TestGetTop10Sql(t *testing.T) { + record := memoryUsageAlarm{} + record.initMemoryUsageAlarmRecord() + record.lastCheckTime = genTime(123456) + + processInfoList := genMockProcessInfoList([]int64{1000, 87263523, 34223}, + []time.Time{genTime(1234), genTime(123456), genTime(12)}, + 3) + actual := record.getTop10SqlInfoByMemoryUsage(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", + actual.String()) + actual = record.getTop10SqlInfoByCostTime(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) + + processInfoList = genMockProcessInfoList([]int64{1000, 87263523, 34223, 532355, 123225151, 231231515, 12312, 12515134234, 232, 12414, 15263236, 123123123, 15}, + []time.Time{genTime(1234), genTime(123456), genTime(12), genTime(3241), genTime(12515), genTime(3215), genTime(61314), genTime(12234), genTime(1123), genTime(512), genTime(11111), genTime(22222), genTime(5512)}, + 13) + actual = record.getTop10SqlInfoByMemoryUsage(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 111222s\ntxn_start_ts: 0\nmem_max: 12515134234 Bytes (11.7 GB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 120241s\ntxn_start_ts: 0\nmem_max: 231231515 Bytes (220.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 110941s\ntxn_start_ts: 0\nmem_max: 123225151 Bytes (117.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 3: \ncost_time: 101234s\ntxn_start_ts: 0\nmem_max: 123123123 Bytes (117.4 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 4: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 5: \ncost_time: 112345s\ntxn_start_ts: 0\nmem_max: 15263236 Bytes (14.6 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 6: \ncost_time: 120215s\ntxn_start_ts: 0\nmem_max: 532355 Bytes (519.9 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 7: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 8: \ncost_time: 122944s\ntxn_start_ts: 0\nmem_max: 12414 Bytes (12.1 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 9: \ncost_time: 62142s\ntxn_start_ts: 0\nmem_max: 12312 Bytes (12.0 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) + actual = record.getTop10SqlInfoByCostTime(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 122944s\ntxn_start_ts: 0\nmem_max: 12414 Bytes (12.1 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 122333s\ntxn_start_ts: 0\nmem_max: 232 Bytes (232 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 3: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 4: \ncost_time: 120241s\ntxn_start_ts: 0\nmem_max: 231231515 Bytes (220.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 5: \ncost_time: 120215s\ntxn_start_ts: 0\nmem_max: 532355 Bytes (519.9 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 6: \ncost_time: 117944s\ntxn_start_ts: 0\nmem_max: 15 Bytes (15 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 7: \ncost_time: 112345s\ntxn_start_ts: 0\nmem_max: 15263236 Bytes (14.6 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 8: \ncost_time: 111222s\ntxn_start_ts: 0\nmem_max: 12515134234 Bytes (11.7 GB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 9: \ncost_time: 110941s\ntxn_start_ts: 0\nmem_max: 123225151 Bytes (117.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) +} + +func genMockProcessInfoList(memConsumeList []int64, startTimeList []time.Time, size int) []*util.ProcessInfo { + processInfoList := make([]*util.ProcessInfo, 0, size) + for i := 0; i < size; i++ { + tracker := memory.NewTracker(0, 0) + tracker.Consume(memConsumeList[i]) + var stmtCtxRefCount stmtctx.ReferenceCount = 0 + processInfo := util.ProcessInfo{Time: startTimeList[i], + StmtCtx: &stmtctx.StatementContext{}, + MemTracker: tracker, + StatsInfo: func(interface{}) map[string]uint64 { + return map[string]uint64{} + }, + RefCountOfStmtCtx: &stmtCtxRefCount, + } + processInfoList = append(processInfoList, &processInfo) + } + return processInfoList +} + +func TestUpdateVariables(t *testing.T) { + variable.MemoryUsageAlarmRatio.Store(0.3) + variable.MemoryUsageAlarmKeepRecordNum.Store(3) + memory.ServerMemoryLimit.Store(1024) + + record := memoryUsageAlarm{} + + record.initMemoryUsageAlarmRecord() + assert.Equal(t, 0.3, record.memoryUsageAlarmRatio) + assert.Equal(t, int64(3), record.memoryUsageAlarmKeepRecordNum) + assert.Equal(t, uint64(1024), record.serverMemoryLimit) + variable.MemoryUsageAlarmRatio.Store(0.6) + variable.MemoryUsageAlarmKeepRecordNum.Store(6) + memory.ServerMemoryLimit.Store(2048) + + record.updateVariable() + assert.Equal(t, 0.3, record.memoryUsageAlarmRatio) + assert.Equal(t, int64(3), record.memoryUsageAlarmKeepRecordNum) + assert.Equal(t, uint64(1024), record.serverMemoryLimit) + record.lastUpdateVariableTime = record.lastUpdateVariableTime.Add(-60 * time.Second) + record.updateVariable() + assert.Equal(t, 0.6, record.memoryUsageAlarmRatio) + assert.Equal(t, int64(6), record.memoryUsageAlarmKeepRecordNum) + assert.Equal(t, uint64(2048), record.serverMemoryLimit) +} diff --git a/util/processinfo.go b/util/processinfo.go index 5b226cf3f1305..5746ce984453c 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -31,6 +31,7 @@ import ( // ProcessInfo is a struct used for show processlist statement. type ProcessInfo struct { +<<<<<<< HEAD ID uint64 User string Host string @@ -45,6 +46,26 @@ type ProcessInfo struct { CurTxnStartTS uint64 StmtCtx *stmtctx.StatementContext StatsInfo func(interface{}) map[string]uint64 +======= + Time time.Time + Plan interface{} + StmtCtx *stmtctx.StatementContext + RefCountOfStmtCtx *stmtctx.ReferenceCount + MemTracker *memory.Tracker + DiskTracker *disk.Tracker + StatsInfo func(interface{}) map[string]uint64 + RuntimeStatsColl *execdetails.RuntimeStatsColl + DB string + Digest string + Host string + User string + Info string + Port string + PlanExplainRows [][]string + OOMAlarmVariablesInfo OOMAlarmVariablesInfo + ID uint64 + CurTxnStartTS uint64 +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) // MaxExecutionTime is the timeout for select statement, in milliseconds. // If the query takes too long, kill it. MaxExecutionTime uint64 diff --git a/util/util.go b/util/util.go new file mode 100644 index 0000000000000..249db91f98d06 --- /dev/null +++ b/util/util.go @@ -0,0 +1,176 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser" + "go.uber.org/zap" +) + +// SliceToMap converts slice to map +// nolint:unused +func SliceToMap(slice []string) map[string]interface{} { + sMap := make(map[string]interface{}) + for _, str := range slice { + sMap[str] = struct{}{} + } + return sMap +} + +// StringsToInterfaces converts string slice to interface slice +func StringsToInterfaces(strs []string) []interface{} { + is := make([]interface{}, 0, len(strs)) + for _, str := range strs { + is = append(is, str) + } + + return is +} + +// GetJSON fetches a page and parses it as JSON. The parsed result will be +// stored into the `v`. The variable `v` must be a pointer to a type that can be +// unmarshalled from JSON. +// +// Example: +// +// client := &http.Client{} +// var resp struct { IP string } +// if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil { +// return errors.Trace(err) +// } +// fmt.Println(resp.IP) +func GetJSON(client *http.Client, url string, v interface{}) error { + resp, err := client.Get(url) + if err != nil { + return errors.Trace(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Trace(err) + } + return errors.Errorf("get %s http status code != 200, message %s", url, string(body)) + } + + return errors.Trace(json.NewDecoder(resp.Body).Decode(v)) +} + +// ChanMap creates a channel which applies the function over the input Channel. +// Hint of Resource Leakage: +// In golang, channel isn't an interface so we must create a goroutine for handling the inputs. +// Hence the input channel must be closed properly or this function may leak a goroutine. +func ChanMap[T, R any](c <-chan T, f func(T) R) <-chan R { + outCh := make(chan R) + go func() { + defer close(outCh) + for item := range c { + outCh <- f(item) + } + }() + return outCh +} + +// Str2Int64Map converts a string to a map[int64]struct{}. +func Str2Int64Map(str string) map[int64]struct{} { + strs := strings.Split(str, ",") + res := make(map[int64]struct{}, len(strs)) + for _, s := range strs { + id, _ := strconv.ParseInt(s, 10, 64) + res[id] = struct{}{} + } + return res +} + +// GenLogFields generate log fields. +func GenLogFields(costTime time.Duration, info *ProcessInfo, needTruncateSQL bool) []zap.Field { + if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() { + return nil + } + defer info.RefCountOfStmtCtx.Decrease() + + logFields := make([]zap.Field, 0, 20) + logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s")) + execDetail := info.StmtCtx.GetExecDetails() + logFields = append(logFields, execDetail.ToZapFields()...) + if copTaskInfo := info.StmtCtx.CopTasksDetails(); copTaskInfo != nil { + logFields = append(logFields, copTaskInfo.ToZapFields()...) + } + if statsInfo := info.StatsInfo(info.Plan); len(statsInfo) > 0 { + var buf strings.Builder + firstComma := false + vStr := "" + for k, v := range statsInfo { + if v == 0 { + vStr = "pseudo" + } else { + vStr = strconv.FormatUint(v, 10) + } + if firstComma { + buf.WriteString("," + k + ":" + vStr) + } else { + buf.WriteString(k + ":" + vStr) + firstComma = true + } + } + logFields = append(logFields, zap.String("stats", buf.String())) + } + if info.ID != 0 { + logFields = append(logFields, zap.Uint64("conn_id", info.ID)) + } + if len(info.User) > 0 { + logFields = append(logFields, zap.String("user", info.User)) + } + if len(info.DB) > 0 { + logFields = append(logFields, zap.String("database", info.DB)) + } + var tableIDs, indexNames string + if len(info.StmtCtx.TableIDs) > 0 { + tableIDs = strings.Replace(fmt.Sprintf("%v", info.StmtCtx.TableIDs), " ", ",", -1) + logFields = append(logFields, zap.String("table_ids", tableIDs)) + } + if len(info.StmtCtx.IndexNames) > 0 { + indexNames = strings.Replace(fmt.Sprintf("%v", info.StmtCtx.IndexNames), " ", ",", -1) + logFields = append(logFields, zap.String("index_names", indexNames)) + } + logFields = append(logFields, zap.Uint64("txn_start_ts", info.CurTxnStartTS)) + if memTracker := info.MemTracker; memTracker != nil { + logFields = append(logFields, zap.String("mem_max", fmt.Sprintf("%d Bytes (%v)", memTracker.MaxConsumed(), memTracker.FormatBytes(memTracker.MaxConsumed())))) + } + + const logSQLLen = 1024 * 8 + var sql string + if len(info.Info) > 0 { + sql = info.Info + if info.RedactSQL { + sql = parser.Normalize(sql) + } + } + if len(sql) > logSQLLen && needTruncateSQL { + sql = fmt.Sprintf("%s len(%d)", sql[:logSQLLen], len(sql)) + } + logFields = append(logFields, zap.String("sql", sql)) + return logFields +} diff --git a/util/util_test.go b/util/util_test.go new file mode 100644 index 0000000000000..ca68a55cd8ba6 --- /dev/null +++ b/util/util_test.go @@ -0,0 +1,74 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/util/memory" + "github.com/stretchr/testify/assert" +) + +func TestLogFormat(t *testing.T) { + mem := memory.NewTracker(-1, -1) + mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27) + mockTooLongQuery := make([]byte, 1024*9) + + var refCount stmtctx.ReferenceCount = 0 + info := &ProcessInfo{ + ID: 233, + User: "PingCAP", + Host: "127.0.0.1", + DB: "Database", + Info: "select * from table where a > 1", + CurTxnStartTS: 23333, + StatsInfo: func(interface{}) map[string]uint64 { + return nil + }, + StmtCtx: &stmtctx.StatementContext{}, + RefCountOfStmtCtx: &refCount, + MemTracker: mem, + RedactSQL: false, + } + costTime := time.Second * 233 + logSQLTruncateLen := 1024 * 8 + logFields := GenLogFields(costTime, info, true) + + assert.Len(t, logFields, 7) + assert.Equal(t, "cost_time", logFields[0].Key) + assert.Equal(t, "233s", logFields[0].String) + assert.Equal(t, "conn_id", logFields[1].Key) + assert.Equal(t, int64(233), logFields[1].Integer) + assert.Equal(t, "user", logFields[2].Key) + assert.Equal(t, "PingCAP", logFields[2].String) + assert.Equal(t, "database", logFields[3].Key) + assert.Equal(t, "Database", logFields[3].String) + assert.Equal(t, "txn_start_ts", logFields[4].Key) + assert.Equal(t, int64(23333), logFields[4].Integer) + assert.Equal(t, "mem_max", logFields[5].Key) + assert.Equal(t, "2013265920 Bytes (1.88 GB)", logFields[5].String) + assert.Equal(t, "sql", logFields[6].Key) + assert.Equal(t, "select * from table where a > 1", logFields[6].String) + + logFields = GenLogFields(costTime, info, true) + assert.Equal(t, "select * from table where a > 1", logFields[6].String) + info.Info = string(mockTooLongQuery) + logFields = GenLogFields(costTime, info, true) + assert.Equal(t, len(logFields[6].String), logSQLTruncateLen+10) + logFields = GenLogFields(costTime, info, false) + assert.Equal(t, len(logFields[6].String), len(mockTooLongQuery)) +}