Skip to content

Commit

Permalink
domain: make the transaction from initStatsCtx blocking gc (#53602)
Browse files Browse the repository at this point in the history
close #53592
  • Loading branch information
you06 authored May 28, 2024
1 parent 772db20 commit bf704fd
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 3 deletions.
19 changes: 19 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
},
"analyzeJobsCleanupWorker",
)
do.wg.Run(
func() {
// The initStatsCtx is used to store the internal session for initializing stats,
// so we need the gc min start ts calculation to track it as an internal session.
// Since the session manager may not be ready at this moment, `infosync.StoreInternalSession` can fail.
// we need to retry until the session manager is ready or the init stats completes.
for !infosync.StoreInternalSession(initStatsCtx) {
waitRetry := time.After(time.Second)
select {
case <-do.StatsHandle().InitStatsDone:
return
case <-waitRetry:
}
}
<-do.StatsHandle().InitStatsDone
infosync.DeleteInternalSession(initStatsCtx)
},
"RemoveInitStatsFromInternalSessions",
)
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,16 +1243,18 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD
}

// StoreInternalSession is the entry function for store an internal session to SessionManager.
func StoreInternalSession(se any) {
// return whether the session is stored successfully.
func StoreInternalSession(se any) bool {
is, err := getGlobalInfoSyncer()
if err != nil {
return
return false
}
sm := is.GetSessionManager()
if sm == nil {
return
return false
}
sm.StoreInternalSession(se)
return true
}

// DeleteInternalSession is the entry function for delete an internal session from SessionManager.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ go_test(
"@com_github_prometheus_client_golang//prometheus/testutil",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
Expand Down
45 changes: 45 additions & 0 deletions pkg/server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/server/internal/util"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestUptime(t *testing.T) {
Expand Down Expand Up @@ -63,3 +65,46 @@ func TestUptime(t *testing.T) {
require.NoError(t, err)
require.GreaterOrEqual(t, stats[upTime].(int64), int64(time.Since(time.Unix(1282967700, 0)).Seconds()))
}

func TestInitStatsSessionBlockGC(t *testing.T) {
origConfig := config.GetGlobalConfig()
defer func() {
config.StoreGlobalConfig(origConfig)
}()
newConfig := *origConfig
for _, lite := range []bool{false, true} {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats", "pause"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite", "pause"))
newConfig.Performance.LiteInitStats = lite
config.StoreGlobalConfig(&newConfig)

store, err := mockstore.NewMockStore()
require.NoError(t, err)
dom, err := session.BootstrapSession(store)
require.NoError(t, err)

infoSyncer := dom.InfoSyncer()
sv := CreateMockServer(t, store)
sv.SetDomain(dom)
infoSyncer.SetSessionManager(sv)
time.Sleep(time.Second)
require.Eventually(t, func() bool {
now := time.Now()
startTSList := sv.GetInternalSessionStartTSList()
for _, startTs := range startTSList {
if startTs != 0 {
startTime := oracle.GetTimeFromTS(startTs)
// test pass if the min_start_ts is blocked over 1s.
if now.Sub(startTime) > time.Second {
return true
}
}
}
return false
}, 10*time.Second, 10*time.Millisecond, "min_start_ts is not blocked over 1s")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite"))
dom.Close()
require.NoError(t, store.Close())
}
}
1 change: 1 addition & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
],
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -694,6 +695,7 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStatsLite", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -721,6 +723,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStats", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit bf704fd

Please sign in to comment.