From 685bcaaefc446464b5f198c721795ecf45be814b Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Fri, 25 Aug 2023 15:53:05 +0800 Subject: [PATCH] *: enable lfu for analyze (#46399) ref pingcap/tidb#46158 --- domain/domain_sysvars.go | 3 - executor/test/analyzetest/BUILD.bazel | 3 +- executor/test/analyzetest/analyze_test.go | 157 --------------- executor/test/analyzetest/main_test.go | 4 + .../analyzetest/memorycontrol/BUILD.bazel | 24 +++ .../analyzetest/memorycontrol/main_test.go | 36 ++++ .../memorycontrol/memory_control_test.go | 185 ++++++++++++++++++ .../handle/cache/internal/lfu/BUILD.bazel | 2 + .../handle/cache/internal/lfu/lfu_cache.go | 35 +++- .../cache/internal/lfu/lfu_cache_test.go | 8 + statistics/handle/handle.go | 1 + 11 files changed, 290 insertions(+), 168 deletions(-) create mode 100644 executor/test/analyzetest/memorycontrol/BUILD.bazel create mode 100644 executor/test/analyzetest/memorycontrol/main_test.go create mode 100644 executor/test/analyzetest/memorycontrol/memory_control_test.go diff --git a/domain/domain_sysvars.go b/domain/domain_sysvars.go index 68207a844ba6e..009d6a82c3fdf 100644 --- a/domain/domain_sysvars.go +++ b/domain/domain_sysvars.go @@ -20,9 +20,7 @@ import ( "time" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/logutil" pd "github.com/tikv/pd/client" - "go.uber.org/zap" ) // initDomainSysVars() is called when a domain is initialized. @@ -46,7 +44,6 @@ func (do *Domain) initDomainSysVars() { // setStatsCacheCapacity sets statsCache cap func (do *Domain) setStatsCacheCapacity(c int64) { do.StatsHandle().SetStatsCacheCapacity(c) - logutil.BgLogger().Info("update stats cache capacity successfully", zap.Int64("capacity", c)) } func (do *Domain) setPDClientDynamicOption(name, sVal string) { diff --git a/executor/test/analyzetest/BUILD.bazel b/executor/test/analyzetest/BUILD.bazel index 7601693fb0ef1..fc2e852de73bd 100644 --- a/executor/test/analyzetest/BUILD.bazel +++ b/executor/test/analyzetest/BUILD.bazel @@ -10,6 +10,7 @@ go_test( flaky = True, shard_count = 50, deps = [ + "//config", "//domain", "//domain/infosync", "//errno", @@ -29,10 +30,8 @@ go_test( "//tablecodec", "//testkit", "//types", - "//util", "//util/codec", "//util/dbterror/exeerrors", - "//util/memory", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", diff --git a/executor/test/analyzetest/analyze_test.go b/executor/test/analyzetest/analyze_test.go index ff13f753f912c..37cfcd5ce8478 100644 --- a/executor/test/analyzetest/analyze_test.go +++ b/executor/test/analyzetest/analyze_test.go @@ -17,7 +17,6 @@ package analyzetest import ( "context" "fmt" - "runtime" "strconv" "strings" "testing" @@ -44,10 +43,8 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/dbterror/exeerrors" - "github.com/pingcap/tidb/util/memory" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" @@ -3064,160 +3061,6 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseModifyCount")) } -func TestGlobalMemoryControlForAnalyze(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - - tk0 := testkit.NewTestKit(t, store) - tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") - tk0.MustExec("set global tidb_server_memory_limit = 512MB") - tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") - - sm := &testkit.MockSessionManager{ - PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, - } - dom.ServerMemoryLimitHandle().SetSessionManager(sm) - go dom.ServerMemoryLimitHandle().Run() - - tk0.MustExec("use test") - tk0.MustExec("create table t(a int)") - tk0.MustExec("insert into t select 1") - for i := 1; i <= 8; i++ { - tk0.MustExec("insert into t select * from t") // 256 Lines - } - sql := "analyze table t with 1.0 samplerate;" // Need about 100MB - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) - _, err := tk0.Exec(sql) - require.True(t, strings.Contains(err.Error(), memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance)) - runtime.GC() - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) - tk0.MustExec(sql) -} - -func TestGlobalMemoryControlForPrepareAnalyze(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - - tk0 := testkit.NewTestKit(t, store) - tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") - tk0.MustExec("set global tidb_mem_quota_query = 209715200 ") // 200MB - tk0.MustExec("set global tidb_server_memory_limit = 5GB") - tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") - - sm := &testkit.MockSessionManager{ - PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, - } - dom.ServerMemoryLimitHandle().SetSessionManager(sm) - go dom.ServerMemoryLimitHandle().Run() - - tk0.MustExec("use test") - tk0.MustExec("create table t(a int)") - tk0.MustExec("insert into t select 1") - for i := 1; i <= 8; i++ { - tk0.MustExec("insert into t select * from t") // 256 Lines - } - sqlPrepare := "prepare stmt from 'analyze table t with 1.0 samplerate';" - sqlExecute := "execute stmt;" // Need about 100MB - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) // 512MB - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) - // won't be killed by tidb_mem_quota_query - tk0.MustExec(sqlPrepare) - tk0.MustExec(sqlExecute) - runtime.GC() - // killed by tidb_server_memory_limit - tk0.MustExec("set global tidb_server_memory_limit = 512MB") - _, err0 := tk0.Exec(sqlPrepare) - require.NoError(t, err0) - _, err1 := tk0.Exec(sqlExecute) - // Killed and the WarnMsg is WarnMsgSuffixForInstance instead of WarnMsgSuffixForSingleQuery - require.True(t, strings.Contains(err1.Error(), memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance)) - runtime.GC() - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) - tk0.MustExec(sqlPrepare) - tk0.MustExec(sqlExecute) -} - -func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - originalVal1 := tk.MustQuery("select @@global.tidb_mem_oom_action").Rows()[0][0].(string) - tk.MustExec("set global tidb_mem_oom_action = 'cancel'") - //originalVal2 := tk.MustQuery("select @@global.tidb_server_memory_limit").Rows()[0][0].(string) - tk.MustExec("set global tidb_server_memory_limit = 512MB") - originalVal3 := tk.MustQuery("select @@global.tidb_server_memory_limit_sess_min_size").Rows()[0][0].(string) - tk.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") - defer func() { - tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action = %v", originalVal1)) - //tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit = %v", originalVal2)) - tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit_sess_min_size = %v", originalVal3)) - }() - - // clean child trackers - oldChildTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() - for _, tracker := range oldChildTrackers { - tracker.Detach() - } - defer func() { - for _, tracker := range oldChildTrackers { - tracker.AttachTo(executor.GlobalAnalyzeMemoryTracker) - } - }() - childTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() - require.Len(t, childTrackers, 0) - - tk.MustExec("use test") - tk.MustExec("create table t(a int)") - tk.MustExec("insert into t select 1") - for i := 1; i <= 8; i++ { - tk.MustExec("insert into t select * from t") // 256 Lines - } - _, err0 := tk.Exec("analyze table t with 1.0 samplerate;") - require.NoError(t, err0) - rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") - require.Len(t, rs0.Rows(), 0) - - h := dom.StatsHandle() - originalVal4 := handle.AutoAnalyzeMinCnt - originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string) - handle.AutoAnalyzeMinCnt = 0 - tk.MustExec("set global tidb_auto_analyze_ratio = 0.001") - defer func() { - handle.AutoAnalyzeMinCnt = originalVal4 - tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_ratio = %v", originalVal5)) - }() - - sm := &testkit.MockSessionManager{ - Dom: dom, - PS: []*util.ProcessInfo{tk.Session().ShowProcess()}, - } - dom.ServerMemoryLimitHandle().SetSessionManager(sm) - go dom.ServerMemoryLimitHandle().Run() - - tk.MustExec("insert into t values(4),(5),(6)") - require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) - err := h.Update(dom.InfoSchema()) - require.NoError(t, err) - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) - }() - tk.MustQuery("select 1") - childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() - require.Len(t, childTrackers, 0) - - h.HandleAutoAnalyze(dom.InfoSchema()) - rs := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") - failReason := rs.Rows()[0][0].(string) - require.True(t, strings.Contains(failReason, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance)) - - childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() - require.Len(t, childTrackers, 0) -} - func TestAnalyzeColumnsSkipMVIndexJsonCol(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/executor/test/analyzetest/main_test.go b/executor/test/analyzetest/main_test.go index 713ca5941822a..1141e30df2af8 100644 --- a/executor/test/analyzetest/main_test.go +++ b/executor/test/analyzetest/main_test.go @@ -17,10 +17,14 @@ package analyzetest import ( "testing" + "github.com/pingcap/tidb/config" "go.uber.org/goleak" ) func TestMain(m *testing.M) { + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.EnableStatsCacheMemQuota = true + }) opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), diff --git a/executor/test/analyzetest/memorycontrol/BUILD.bazel b/executor/test/analyzetest/memorycontrol/BUILD.bazel new file mode 100644 index 0000000000000..658da93663b81 --- /dev/null +++ b/executor/test/analyzetest/memorycontrol/BUILD.bazel @@ -0,0 +1,24 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "memorycontrol_test", + timeout = "short", + srcs = [ + "main_test.go", + "memory_control_test.go", + ], + flaky = True, + shard_count = 3, + deps = [ + "//config", + "//executor", + "//sessionctx/variable", + "//statistics/handle", + "//testkit", + "//util", + "//util/memory", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/executor/test/analyzetest/memorycontrol/main_test.go b/executor/test/analyzetest/memorycontrol/main_test.go new file mode 100644 index 0000000000000..8efcfe4ba08b1 --- /dev/null +++ b/executor/test/analyzetest/memorycontrol/main_test.go @@ -0,0 +1,36 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorycontrol + +import ( + "testing" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/sessionctx/variable" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.EnableStatsCacheMemQuota = true + }) + variable.StatsCacheMemQuota.Store(1000000) + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/executor/test/analyzetest/memorycontrol/memory_control_test.go b/executor/test/analyzetest/memorycontrol/memory_control_test.go new file mode 100644 index 0000000000000..c7c058e25b73d --- /dev/null +++ b/executor/test/analyzetest/memorycontrol/memory_control_test.go @@ -0,0 +1,185 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorycontrol + +import ( + "fmt" + "runtime" + "strings" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/memory" + "github.com/stretchr/testify/require" +) + +func TestGlobalMemoryControlForAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk0 := testkit.NewTestKit(t, store) + tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") + tk0.MustExec("set global tidb_server_memory_limit = 512MB") + tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") + + sm := &testkit.MockSessionManager{ + PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, + } + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk0.MustExec("use test") + tk0.MustExec("create table t(a int)") + tk0.MustExec("insert into t select 1") + for i := 1; i <= 8; i++ { + tk0.MustExec("insert into t select * from t") // 256 Lines + } + sql := "analyze table t with 1.0 samplerate;" // Need about 100MB + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) + _, err := tk0.Exec(sql) + require.True(t, strings.Contains(err.Error(), memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance)) + runtime.GC() + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) + tk0.MustExec(sql) +} + +func TestGlobalMemoryControlForPrepareAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk0 := testkit.NewTestKit(t, store) + tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") + tk0.MustExec("set global tidb_mem_quota_query = 209715200 ") // 200MB + tk0.MustExec("set global tidb_server_memory_limit = 5GB") + tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") + + sm := &testkit.MockSessionManager{ + PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, + } + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk0.MustExec("use test") + tk0.MustExec("create table t(a int)") + tk0.MustExec("insert into t select 1") + for i := 1; i <= 8; i++ { + tk0.MustExec("insert into t select * from t") // 256 Lines + } + sqlPrepare := "prepare stmt from 'analyze table t with 1.0 samplerate';" + sqlExecute := "execute stmt;" // Need about 100MB + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) // 512MB + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) + runtime.GC() + // won't be killed by tidb_mem_quota_query + tk0.MustExec(sqlPrepare) + tk0.MustExec(sqlExecute) + runtime.GC() + // killed by tidb_server_memory_limit + tk0.MustExec("set global tidb_server_memory_limit = 512MB") + _, err0 := tk0.Exec(sqlPrepare) + require.NoError(t, err0) + _, err1 := tk0.Exec(sqlExecute) + // Killed and the WarnMsg is WarnMsgSuffixForInstance instead of WarnMsgSuffixForSingleQuery + require.True(t, strings.Contains(err1.Error(), memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance)) + runtime.GC() + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) + tk0.MustExec(sqlPrepare) + tk0.MustExec(sqlExecute) +} + +func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + originalVal1 := tk.MustQuery("select @@global.tidb_mem_oom_action").Rows()[0][0].(string) + tk.MustExec("set global tidb_mem_oom_action = 'cancel'") + //originalVal2 := tk.MustQuery("select @@global.tidb_server_memory_limit").Rows()[0][0].(string) + tk.MustExec("set global tidb_server_memory_limit = 512MB") + originalVal3 := tk.MustQuery("select @@global.tidb_server_memory_limit_sess_min_size").Rows()[0][0].(string) + tk.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") + defer func() { + tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action = %v", originalVal1)) + //tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit = %v", originalVal2)) + tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit_sess_min_size = %v", originalVal3)) + }() + + // clean child trackers + oldChildTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + for _, tracker := range oldChildTrackers { + tracker.Detach() + } + defer func() { + for _, tracker := range oldChildTrackers { + tracker.AttachTo(executor.GlobalAnalyzeMemoryTracker) + } + }() + childTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) + + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t select 1") + for i := 1; i <= 8; i++ { + tk.MustExec("insert into t select * from t") // 256 Lines + } + _, err0 := tk.Exec("analyze table t with 1.0 samplerate;") + require.NoError(t, err0) + rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") + require.Len(t, rs0.Rows(), 0) + + h := dom.StatsHandle() + originalVal4 := handle.AutoAnalyzeMinCnt + originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string) + handle.AutoAnalyzeMinCnt = 0 + tk.MustExec("set global tidb_auto_analyze_ratio = 0.001") + defer func() { + handle.AutoAnalyzeMinCnt = originalVal4 + tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_ratio = %v", originalVal5)) + }() + + sm := &testkit.MockSessionManager{ + Dom: dom, + PS: []*util.ProcessInfo{tk.Session().ShowProcess()}, + } + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk.MustExec("insert into t values(4),(5),(6)") + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + err := h.Update(dom.InfoSchema()) + require.NoError(t, err) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) + }() + tk.MustQuery("select 1") + childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) + + h.HandleAutoAnalyze(dom.InfoSchema()) + rs := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") + failReason := rs.Rows()[0][0].(string) + require.True(t, strings.Contains(failReason, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance)) + + childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) +} diff --git a/statistics/handle/cache/internal/lfu/BUILD.bazel b/statistics/handle/cache/internal/lfu/BUILD.bazel index f37969f4312d1..8304d79e35039 100644 --- a/statistics/handle/cache/internal/lfu/BUILD.bazel +++ b/statistics/handle/cache/internal/lfu/BUILD.bazel @@ -14,10 +14,12 @@ go_library( "//statistics/handle/cache/internal", "//statistics/handle/cache/internal/metrics", "//util/intest", + "//util/logutil", "//util/mathutil", "//util/memory", "@com_github_dgraph_io_ristretto//:ristretto", "@org_golang_x_exp//maps", + "@org_uber_go_zap//:zap", ], ) diff --git a/statistics/handle/cache/internal/lfu/lfu_cache.go b/statistics/handle/cache/internal/lfu/lfu_cache.go index e31c202d6eddb..c0540d86ef536 100644 --- a/statistics/handle/cache/internal/lfu/lfu_cache.go +++ b/statistics/handle/cache/internal/lfu/lfu_cache.go @@ -23,8 +23,10 @@ import ( "github.com/pingcap/tidb/statistics/handle/cache/internal" "github.com/pingcap/tidb/statistics/handle/cache/internal/metrics" "github.com/pingcap/tidb/util/intest" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" + "go.uber.org/zap" ) // LFU is a LFU based on the ristretto.Cache @@ -35,14 +37,20 @@ type LFU struct { closeOnce sync.Once } +var testMode = false + // NewLFU creates a new LFU cache. func NewLFU(totalMemCost int64) (*LFU, error) { if totalMemCost == 0 { - memTotal, err := memory.MemTotal() - if err != nil { - return nil, err + if intest.InTest { + totalMemCost = 5000000 + } else { + memTotal, err := memory.MemTotal() + if err != nil { + return nil, err + } + totalMemCost = int64(memTotal / 2) } - totalMemCost = int64(memTotal / 2) } metrics.CapacityGauge.Set(float64(totalMemCost)) result := &LFU{} @@ -55,8 +63,8 @@ func NewLFU(totalMemCost int64) (*LFU, error) { OnEvict: result.onEvict, OnExit: result.onExit, OnReject: result.onReject, - IgnoreInternalCost: intest.InTest, - Metrics: intest.InTest, + IgnoreInternalCost: testMode, + Metrics: testMode, }) if err != nil { return nil, err @@ -115,11 +123,21 @@ func DropEvicted(item statistics.TableCacheItem) { } func (s *LFU) onReject(item *ristretto.Item) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Warn("panic in onReject", zap.Any("error", r), zap.Stack("stack")) + } + }() metrics.RejectCounter.Add(1.0) s.dropMemory(item) } func (s *LFU) onEvict(item *ristretto.Item) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Warn("panic in onEvict", zap.Any("error", r), zap.Stack("stack")) + } + }() s.dropMemory(item) metrics.EvictCounter.Inc() } @@ -150,6 +168,11 @@ func (s *LFU) dropMemory(item *ristretto.Item) { } func (s *LFU) onExit(val any) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Warn("panic in onExit", zap.Any("error", r), zap.Stack("stack")) + } + }() if val == nil { // Sometimes the same key may be passed to the "onEvict/onExit" function twice, // and in the second invocation, the value is empty, so it should not be processed. diff --git a/statistics/handle/cache/internal/lfu/lfu_cache_test.go b/statistics/handle/cache/internal/lfu/lfu_cache_test.go index 107a6777718e8..5207f1a6a621b 100644 --- a/statistics/handle/cache/internal/lfu/lfu_cache_test.go +++ b/statistics/handle/cache/internal/lfu/lfu_cache_test.go @@ -32,6 +32,7 @@ var ( ) func TestLFUPutGetDel(t *testing.T) { + testMode = true capacity := int64(100) lfu, err := NewLFU(capacity) require.NoError(t, err) @@ -49,6 +50,7 @@ func TestLFUPutGetDel(t *testing.T) { } func TestLFUFreshMemUsage(t *testing.T) { + testMode = true lfu, err := NewLFU(10000) require.NoError(t, err) t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false) @@ -83,6 +85,7 @@ func TestLFUFreshMemUsage(t *testing.T) { } func TestLFUPutTooBig(t *testing.T) { + testMode = true lfu, err := NewLFU(1) require.NoError(t, err) mockTable := testutil.NewMockStatisticsTable(1, 1, true, false, false) @@ -95,6 +98,7 @@ func TestLFUPutTooBig(t *testing.T) { } func TestCacheLen(t *testing.T) { + testMode = true capacity := int64(12) lfu, err := NewLFU(capacity) require.NoError(t, err) @@ -117,6 +121,7 @@ func TestCacheLen(t *testing.T) { } func TestLFUCachePutGetWithManyConcurrency(t *testing.T) { + testMode = true // to test DATA RACE capacity := int64(100000000000) lfu, err := NewLFU(capacity) @@ -142,6 +147,7 @@ func TestLFUCachePutGetWithManyConcurrency(t *testing.T) { } func TestLFUCachePutGetWithManyConcurrency2(t *testing.T) { + testMode = true // to test DATA RACE capacity := int64(100000000000) lfu, err := NewLFU(capacity) @@ -172,6 +178,7 @@ func TestLFUCachePutGetWithManyConcurrency2(t *testing.T) { } func TestLFUCachePutGetWithManyConcurrencyAndSmallConcurrency(t *testing.T) { + testMode = true // to test DATA RACE capacity := int64(100) @@ -237,6 +244,7 @@ func checkTable(t *testing.T, tbl *statistics.Table) { } func TestLFUReject(t *testing.T) { + testMode = true capacity := int64(100000000000) lfu, err := NewLFU(capacity) require.NoError(t, err) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 5bce85c55ccb6..9582bf0fbd7ee 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -2287,6 +2287,7 @@ func (h *Handle) SetStatsCacheCapacity(c int64) { } sc := v sc.SetCapacity(c) + logutil.BgLogger().Info("update stats cache capacity successfully", zap.Int64("capacity", c)) } // Close stops the background