Skip to content

Commit

Permalink
*: fix ctrl+c can not kill tidb during statistics init (#54594)
Browse files Browse the repository at this point in the history
close #54589
  • Loading branch information
tiancaiamao authored Jul 15, 2024
1 parent a453b81 commit 06e0e17
Show file tree
Hide file tree
Showing 34 changed files with 323 additions and 289 deletions.
8 changes: 4 additions & 4 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3042,7 +3042,7 @@ func TestRemoveKeyPartitioning(t *testing.T) {
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(dom.InfoSchema()))
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
tk.MustQuery(`show stats_meta where db_name = 'RemovePartitioning' and table_name = 't'`).Sort().CheckAt([]int{0, 1, 2, 4, 5}, [][]any{
{"RemovePartitioning", "t", "", "0", "95"}})
tk.MustExec(`analyze table t`)
Expand Down Expand Up @@ -3088,7 +3088,7 @@ func TestRemoveListPartitioning(t *testing.T) {
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(dom.InfoSchema()))
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
tk.MustQuery(`show stats_meta where db_name = 'RemoveListPartitioning' and table_name = 't'`).Sort().CheckAt([]int{0, 1, 2, 4, 5}, [][]any{
{"RemoveListPartitioning", "t", "", "0", "95"}})
tk.MustExec(`analyze table t`)
Expand Down Expand Up @@ -3134,7 +3134,7 @@ func TestRemoveListColumnPartitioning(t *testing.T) {
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(dom.InfoSchema()))
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
tk.MustQuery(`show stats_meta where db_name = 'RemoveListPartitioning' and table_name = 't'`).Sort().CheckAt([]int{0, 1, 2, 4, 5}, [][]any{
{"RemoveListPartitioning", "t", "", "0", "95"}})
tk.MustExec(`analyze table t`)
Expand Down Expand Up @@ -3180,7 +3180,7 @@ func TestRemoveListColumnsPartitioning(t *testing.T) {
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(dom.InfoSchema()))
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
tk.MustQuery(`show stats_meta where db_name = 'RemoveListPartitioning' and table_name = 't'`).Sort().CheckAt([]int{0, 1, 2, 4, 5}, [][]any{
{"RemoveListPartitioning", "t", "", "0", "95"}})
tk.MustExec(`analyze table t`)
Expand Down
32 changes: 24 additions & 8 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2334,14 +2334,22 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
// Otherwise, we may start the auto analyze worker before the stats cache is initialized.
do.wg.Run(
func() {
<-do.StatsHandle().InitStatsDone
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.autoAnalyzeWorker(owner)
},
"autoAnalyzeWorker",
)
do.wg.Run(
func() {
<-do.StatsHandle().InitStatsDone
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.analyzeJobsCleanupWorker(owner)
},
"analyzeJobsCleanupWorker",
Expand All @@ -2360,7 +2368,11 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
case <-waitRetry:
}
}
<-do.StatsHandle().InitStatsDone
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
infosync.DeleteInternalSession(initStatsCtx)
},
"RemoveInitStatsFromInternalSessions",
Expand Down Expand Up @@ -2399,7 +2411,7 @@ func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
return statsOwner
}

func (do *Domain) initStats() {
func (do *Domain) initStats(ctx context.Context) {
statsHandle := do.StatsHandle()
defer func() {
if r := recover(); r != nil {
Expand All @@ -2412,9 +2424,9 @@ func (do *Domain) initStats() {
liteInitStats := config.GetGlobalConfig().Performance.LiteInitStats
var err error
if liteInitStats {
err = statsHandle.InitStatsLite(do.InfoSchema())
err = statsHandle.InitStatsLite(ctx, do.InfoSchema())
} else {
err = statsHandle.InitStats(do.InfoSchema())
err = statsHandle.InitStats(ctx, do.InfoSchema())
}
if err != nil {
logutil.BgLogger().Error("init stats info failed", zap.Bool("lite", liteInitStats), zap.Duration("take time", time.Since(t)), zap.Error(err))
Expand All @@ -2436,13 +2448,17 @@ func (do *Domain) loadStatsWorker() {
updStatsHealthyTicker.Stop()
logutil.BgLogger().Info("loadStatsWorker exited.")
}()
do.initStats()

ctx, cancelFunc := context.WithCancel(context.Background())
do.cancelFns = append(do.cancelFns, cancelFunc)

do.initStats(ctx)
statsHandle := do.StatsHandle()
var err error
for {
select {
case <-loadTicker.C:
err = statsHandle.Update(do.InfoSchema())
err = statsHandle.Update(ctx, do.InfoSchema())
if err != nil {
logutil.BgLogger().Debug("update stats info failed", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ TASKLOOP:
if err != nil {
sessionVars.StmtCtx.AppendWarning(err)
}
return statsHandle.Update(infoSchema)
return statsHandle.Update(ctx, infoSchema)
}

func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, resultsCh chan *statistics.AnalyzeResults) error {
Expand Down
14 changes: 7 additions & 7 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,22 @@ func TestDataForTableStatsField(t *testing.T) {
testkit.Rows("0 0 0 0"))
tk.MustExec(`insert into t(c, d, e) values(1, 2, "c"), (2, 3, "d"), (3, 4, "e")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 18 54 6"))
tk.MustExec(`insert into t(c, d, e) values(4, 5, "f")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("4 18 72 8"))
tk.MustExec("delete from t where c >= 3")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 18 36 4"))
tk.MustExec("delete from t where c=3")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 18 36 4"))

Expand All @@ -200,7 +200,7 @@ func TestDataForTableStatsField(t *testing.T) {
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 18 54 6"))
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestPartitionsTable(t *testing.T) {
"[0 0 0 0]\n" +
"[0 0 0 0"))
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check(
testkit.Rows("" +
"1 18 18 2]\n" +
Expand All @@ -245,7 +245,7 @@ func TestPartitionsTable(t *testing.T) {
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
tk.MustExec(`insert into test_partitions_1(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select PARTITION_NAME, TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH, INDEX_LENGTH from information_schema.PARTITIONS where table_name='test_partitions_1';").Check(
testkit.Rows("<nil> 3 18 54 6"))

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/show_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func TestShowStatsExtended(t *testing.T) {
"s1 2",
"s2 2",
))
dom.StatsHandle().Update(dom.InfoSchema())
dom.StatsHandle().Update(context.Background(), dom.InfoSchema())
result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'")
require.Len(t, result.Rows(), 0)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (e *SimpleExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) {
// We just ignore it.
return nil
case *ast.DropStatsStmt:
err = e.executeDropStats(x)
err = e.executeDropStats(ctx, x)
case *ast.SetRoleStmt:
err = e.executeSetRole(x)
case *ast.RevokeRoleStmt:
Expand Down Expand Up @@ -2734,7 +2734,7 @@ func (e *SimpleExec) executeAlterInstance(s *ast.AlterInstanceStmt) error {
return nil
}

func (e *SimpleExec) executeDropStats(s *ast.DropStatsStmt) (err error) {
func (e *SimpleExec) executeDropStats(ctx context.Context, s *ast.DropStatsStmt) (err error) {
h := domain.GetDomain(e.Ctx()).StatsHandle()
var statsIDs []int64
// TODO: GLOBAL option will be deprecated. Also remove this condition when the syntax is removed
Expand All @@ -2760,7 +2760,7 @@ func (e *SimpleExec) executeDropStats(s *ast.DropStatsStmt) (err error) {
if err := h.DeleteTableStatsFromKV(statsIDs); err != nil {
return err
}
return h.Update(e.Ctx().GetInfoSchema().(infoschema.InfoSchema))
return h.Update(ctx, e.Ctx().GetInfoSchema().(infoschema.InfoSchema))
}

func (e *SimpleExec) autoNewTxn() bool {
Expand Down
14 changes: 7 additions & 7 deletions pkg/executor/test/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func TestAdjustSampleRateNote(t *testing.T) {
tblInfo := tbl.Meta()
tid := tblInfo.ID
tk.MustExec(fmt.Sprintf("update mysql.stats_meta set count = 220000 where table_id=%d", tid))
require.NoError(t, statsHandle.Update(is))
require.NoError(t, statsHandle.Update(context.Background(), is))
result := tk.MustQuery("show stats_meta where table_name = 't'")
require.Equal(t, "220000", result.Rows()[0][5])
tk.MustExec("analyze table t")
Expand All @@ -514,7 +514,7 @@ func TestAdjustSampleRateNote(t *testing.T) {
))
tk.MustExec("insert into t values(1),(1),(1)")
require.NoError(t, statsHandle.DumpStatsDeltaToKV(true))
require.NoError(t, statsHandle.Update(is))
require.NoError(t, statsHandle.Update(context.Background(), is))
result = tk.MustQuery("show stats_meta where table_name = 't'")
require.Equal(t, "3", result.Rows()[0][5])
tk.MustExec("analyze table t")
Expand Down Expand Up @@ -746,7 +746,7 @@ func TestSavedAnalyzeOptions(t *testing.T) {
// auto-analyze uses the table-level options
tk.MustExec("insert into t values (10,10,10)")
require.Nil(t, h.DumpStatsDeltaToKV(true))
require.Nil(t, h.Update(is))
require.Nil(t, h.Update(context.Background(), is))
h.HandleAutoAnalyze()
tbl = h.GetTableStats(tableInfo)
require.Greater(t, tbl.Version, lastVersion)
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func TestSavedAnalyzeColumnOptions(t *testing.T) {

tk.MustExec("insert into t values (5,5,5),(6,6,6)")
require.Nil(t, h.DumpStatsDeltaToKV(true))
require.Nil(t, h.Update(is))
require.Nil(t, h.Update(context.Background(), is))
// auto analyze uses the saved option(predicate columns).
h.HandleAutoAnalyze()
tblStats = h.GetTableStats(tblInfo)
Expand Down Expand Up @@ -1906,7 +1906,7 @@ func testKillAutoAnalyze(t *testing.T, ver int) {
tk.MustExec("analyze table t")
tk.MustExec("insert into t values (5,6), (7,8), (9, 10)")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(is))
require.NoError(t, h.Update(context.Background(), is))
table, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := table.Meta()
Expand Down Expand Up @@ -2726,7 +2726,7 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) {
tid := tbl.Meta().ID
tk.MustExec("insert into t values(1),(2),(3)")
require.NoError(t, h.DumpStatsDeltaToKV(true))
err = h.Update(dom.InfoSchema())
err = h.Update(context.Background(), dom.InfoSchema())
require.NoError(t, err)
tk.MustExec("analyze table t")
tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tid)).Check(testkit.Rows(
Expand All @@ -2750,7 +2750,7 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) {

tk.MustExec("insert into t values(4),(5),(6)")
require.NoError(t, h.DumpStatsDeltaToKV(true))
err = h.Update(dom.InfoSchema())
err = h.Update(context.Background(), dom.InfoSchema())
require.NoError(t, err)

// Simulate that the analyze would start before and finish after the second insert.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package memorycontrol

import (
"context"
"fmt"
"runtime"
"strings"
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) {

tk.MustExec("insert into t values(4),(5),(6)")
require.NoError(t, h.DumpStatsDeltaToKV(true))
err := h.Update(dom.InfoSchema())
err := h.Update(context.Background(), dom.InfoSchema())
require.NoError(t, err)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/memory/ReadMemStats", `return(536870912)`))
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/test/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,15 +623,15 @@ func TestShowStatsHealthy(t *testing.T) {
tk.MustExec("insert into t values (3), (4), (5), (6), (7), (8), (9), (10)")
err = do.StatsHandle().DumpStatsDeltaToKV(true)
require.NoError(t, err)
err = do.StatsHandle().Update(do.InfoSchema())
err = do.StatsHandle().Update(context.Background(), do.InfoSchema())
require.NoError(t, err)
tk.MustQuery("show stats_healthy").Check(testkit.Rows("test t 0"))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_healthy").Check(testkit.Rows("test t 100"))
tk.MustExec("delete from t")
err = do.StatsHandle().DumpStatsDeltaToKV(true)
require.NoError(t, err)
err = do.StatsHandle().Update(do.InfoSchema())
err = do.StatsHandle().Update(context.Background(), do.InfoSchema())
require.NoError(t, err)
tk.MustQuery("show stats_healthy").Check(testkit.Rows("test t 0"))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/test/simpletest/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func TestDropStats(t *testing.T) {
require.False(t, statsTbl.Pseudo)

testKit.MustExec("drop stats t")
require.Nil(t, h.Update(is))
require.Nil(t, h.Update(context.Background(), is))
statsTbl = h.GetTableStats(tableInfo)
require.True(t, statsTbl.Pseudo)

Expand All @@ -578,7 +578,7 @@ func TestDropStats(t *testing.T) {

h.SetLease(1)
testKit.MustExec("drop stats t")
require.Nil(t, h.Update(is))
require.Nil(t, h.Update(context.Background(), is))
statsTbl = h.GetTableStats(tableInfo)
require.True(t, statsTbl.Pseudo)
h.SetLease(0)
Expand Down Expand Up @@ -609,7 +609,7 @@ func TestDropStatsForMultipleTable(t *testing.T) {
require.False(t, statsTbl2.Pseudo)

testKit.MustExec("drop stats t1, t2")
require.Nil(t, h.Update(is))
require.Nil(t, h.Update(context.Background(), is))
statsTbl1 = h.GetTableStats(tableInfo1)
require.True(t, statsTbl1.Pseudo)
statsTbl2 = h.GetTableStats(tableInfo2)
Expand All @@ -623,7 +623,7 @@ func TestDropStatsForMultipleTable(t *testing.T) {

h.SetLease(1)
testKit.MustExec("drop stats t1, t2")
require.Nil(t, h.Update(is))
require.Nil(t, h.Update(context.Background(), is))
statsTbl1 = h.GetTableStats(tableInfo1)
require.True(t, statsTbl1.Pseudo)
statsTbl2 = h.GetTableStats(tableInfo2)
Expand Down
Loading

0 comments on commit 06e0e17

Please sign in to comment.