From d6eda8449fc2d642e6e832f354bb7e01be696809 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Wed, 14 Dec 2022 17:31:48 +0800 Subject: [PATCH 01/19] add mpp probe realize --- store/copr/mpp_probe.go | 175 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 store/copr/mpp_probe.go diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go new file mode 100644 index 0000000000000..b0f48f5fc4e51 --- /dev/null +++ b/store/copr/mpp_probe.go @@ -0,0 +1,175 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package copr + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/mpp" + "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" +) + +const ( + DetectTimeoutLimit = 2 * time.Second + DetectPeriod = 3 * time.Second + MaxRecoveryTimeLimit = 15 * time.Minute // wait TiFlash recovery,more than MPPStoreFailTTL + MaxObsoletTimeLimit = 24 * time.Hour // no request for a long time,that might be obsoleted +) + +// MPPSotreState the state for MPPStore. +type MPPSotreState struct { + address string // MPPStore TiFlash address + store *kvStore + + recoveryTime time.Time + lastLookupTime time.Time + lastDetectTime time.Time +} + +// MPPFailedStoreProbe use for detecting of failed TiFlash instance +type MPPFailedStoreProbe struct { + failedMPPStores sync.Map +} + +func (t *MPPSotreState) detect(ctx context.Context) { + if time.Since(t.lastDetectTime) > DetectPeriod { + return + } + + defer func() { t.lastDetectTime = time.Now() }() + + err := detectMPPStore(ctx, t.store.GetTiKVClient(), t.address) + if err != nil { + t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. + return + } + + // record the time of the first recovery + if t.recoveryTime.IsZero() { + t.recoveryTime = time.Now() + } + +} + +func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { + t.lastLookupTime = time.Now() + if !t.recoveryTime.IsZero() && time.Since(t.recoveryTime) > recoveryTTL { + return true + } + return false +} + +func (t MPPFailedStoreProbe) scan(ctx context.Context) { + defer func() { + if r := recover(); r != nil { + logutil.Logger(ctx).Warn("mpp failed store probe scan error", zap.Any("recover", r), zap.Stack("stack")) + } + }() + + do := func(k, v any) { + address := fmt.Sprintln(k) + state, ok := v.(MPPSotreState) + if !ok { + logutil.BgLogger().Warn("MPPSotreState struct Deserialization exception", + zap.String("address", address), + zap.Any("state", v)) + return + } + + state.detect(ctx) + + // clean restored store + if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > MaxRecoveryTimeLimit { + t.failedMPPStores.Delete(address) + // clean store that may be obsolete + } else if !state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > MaxObsoletTimeLimit { + t.failedMPPStores.Delete(address) + } + } + + f := func(k, v any) bool { + go do(k, v) + return true + } + + t.failedMPPStores.Range(f) +} + +// Add add a store when sync probe failed +func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) { + state := MPPSotreState{ + address: address, + store: store, + } + v, ok := t.failedMPPStores.LoadOrStore(address, state) + if !ok { + logutil.Logger(ctx).Debug("failed store repeat add", zap.String("address", address), zap.Any("state", v)) + } + +} + +// IsRecovery check whether the store is recovery +func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { + v, ok := t.failedMPPStores.Load(address) + if !ok { + // store not failed map + return true + } + + state, ok := v.(MPPSotreState) + if !ok { + logutil.Logger(ctx).Warn("MPPSotreState struct Deserialization exception", + zap.String("address", address), + zap.Any("state", v)) + return false + } + return state.isRecovery(ctx, recoveryTTL) +} + +// Run a loop of scan +// there can be only one background task +func (t *MPPFailedStoreProbe) Run() { + for { + t.scan(context.Background()) + time.Sleep(DetectPeriod) + } +} + +// MPPStore detect function +func detectMPPStore(ctx context.Context, client tikv.Client, address string) error { + resp, err := client.SendRequest(ctx, address, &tikvrpc.Request{ + Type: tikvrpc.CmdMPPAlive, + StoreTp: tikvrpc.TiFlash, + Req: &mpp.IsAliveRequest{}, + Context: kvrpcpb.Context{}, + }, DetectTimeoutLimit) + if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { + if err == nil { + err = fmt.Errorf("store not ready to serve") + } + logutil.BgLogger().Warn("Store is not ready", + zap.String("store address", address), + zap.String("err message", err.Error())) + return err + } + return nil +} From 8aab258c4328c63688b4cf171746149c6f2af9d3 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Wed, 14 Dec 2022 17:31:48 +0800 Subject: [PATCH 02/19] add mpp probe realize --- store/copr/mpp_probe.go | 183 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 store/copr/mpp_probe.go diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go new file mode 100644 index 0000000000000..905c45c2c91ee --- /dev/null +++ b/store/copr/mpp_probe.go @@ -0,0 +1,183 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package copr + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/mpp" + "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" +) + +var globalMPPFailedStoreProbe *MPPFailedStoreProbe + +const ( + DetectTimeoutLimit = 2 * time.Second + DetectPeriod = 3 * time.Second + MaxRecoveryTimeLimit = 15 * time.Minute // wait TiFlash recovery,more than MPPStoreFailTTL + MaxObsoletTimeLimit = 24 * time.Hour // no request for a long time,that might be obsoleted +) + +// MPPSotreState the state for MPPStore. +type MPPSotreState struct { + address string // MPPStore TiFlash address + store *kvStore + + recoveryTime time.Time + lastLookupTime time.Time + lastDetectTime time.Time +} + +// MPPFailedStoreProbe use for detecting of failed TiFlash instance +type MPPFailedStoreProbe struct { + failedMPPStores *sync.Map +} + +func (t *MPPSotreState) detect(ctx context.Context) { + if time.Since(t.lastDetectTime) > DetectPeriod { + return + } + + defer func() { t.lastDetectTime = time.Now() }() + + err := detectMPPStore(ctx, t.store.GetTiKVClient(), t.address) + if err != nil { + t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. + return + } + + // record the time of the first recovery + if t.recoveryTime.IsZero() { + t.recoveryTime = time.Now() + } + +} + +func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { + t.lastLookupTime = time.Now() + if !t.recoveryTime.IsZero() && time.Since(t.recoveryTime) > recoveryTTL { + return true + } + return false +} + +func (t MPPFailedStoreProbe) scan(ctx context.Context) { + defer func() { + if r := recover(); r != nil { + logutil.Logger(ctx).Warn("mpp failed store probe scan error", zap.Any("recover", r), zap.Stack("stack")) + } + }() + + do := func(k, v any) { + address := fmt.Sprintln(k) + state, ok := v.(MPPSotreState) + if !ok { + logutil.BgLogger().Warn("MPPSotreState struct Deserialization exception", + zap.String("address", address), + zap.Any("state", v)) + return + } + + state.detect(ctx) + + // clean restored store + if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > MaxRecoveryTimeLimit { + t.failedMPPStores.Delete(address) + // clean store that may be obsolete + } else if !state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > MaxObsoletTimeLimit { + t.failedMPPStores.Delete(address) + } + } + + f := func(k, v any) bool { + go do(k, v) + return true + } + + t.failedMPPStores.Range(f) +} + +// Add add a store when sync probe failed +func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) { + state := MPPSotreState{ + address: address, + store: store, + } + v, ok := t.failedMPPStores.LoadOrStore(address, state) + if !ok { + logutil.Logger(ctx).Debug("failed store repeat add", zap.String("address", address), zap.Any("state", v)) + } + +} + +// IsRecovery check whether the store is recovery +func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { + v, ok := t.failedMPPStores.Load(address) + if !ok { + // store not failed map + return true + } + + state, ok := v.(MPPSotreState) + if !ok { + logutil.Logger(ctx).Warn("MPPSotreState struct Deserialization exception", + zap.String("address", address), + zap.Any("state", v)) + return false + } + return state.isRecovery(ctx, recoveryTTL) +} + +// Run a loop of scan +// there can be only one background task +func (t *MPPFailedStoreProbe) Run() { + for { + t.scan(context.Background()) + time.Sleep(DetectPeriod) + } +} + +// MPPStore detect function +func detectMPPStore(ctx context.Context, client tikv.Client, address string) error { + resp, err := client.SendRequest(ctx, address, &tikvrpc.Request{ + Type: tikvrpc.CmdMPPAlive, + StoreTp: tikvrpc.TiFlash, + Req: &mpp.IsAliveRequest{}, + Context: kvrpcpb.Context{}, + }, DetectTimeoutLimit) + if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { + if err == nil { + err = fmt.Errorf("store not ready to serve") + } + logutil.BgLogger().Warn("Store is not ready", + zap.String("store address", address), + zap.String("err message", err.Error())) + return err + } + return nil +} + +func init() { + globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ + failedMPPStores: &sync.Map{}, + } +} From aedd8a424d1af72bc00d0f9422a4e4343b6d5f63 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Thu, 15 Dec 2022 17:14:07 +0800 Subject: [PATCH 03/19] bugfix and optmize log --- store/copr/mpp_probe.go | 45 ++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 0de7327db295a..78150a2de87b8 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -41,6 +41,7 @@ const ( type MPPSotreState struct { address string // MPPStore TiFlash address store *kvStore + lock sync.Mutex recoveryTime time.Time lastLookupTime time.Time @@ -53,7 +54,12 @@ type MPPFailedStoreProbe struct { } func (t *MPPSotreState) detect(ctx context.Context) { - if time.Since(t.lastDetectTime) > DetectPeriod { + if !t.lock.TryLock() { + return + } + defer t.lock.Unlock() + + if time.Since(t.lastDetectTime) < DetectPeriod { return } @@ -75,30 +81,31 @@ func (t *MPPSotreState) detect(ctx context.Context) { func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { t.lastLookupTime = time.Now() if !t.recoveryTime.IsZero() && time.Since(t.recoveryTime) > recoveryTTL { - logutil.Logger(ctx).Debug("Cannot detect store's availability"+ - "because the current time has not reached recoveryTime + mppStoreFailTTL", - zap.String("store address", t.address), - zap.Time("recovery time", t.recoveryTime), - zap.Duration("MPPStoreFailTTL", recoveryTTL)) return true } + logutil.Logger(ctx).Debug("Cannot detect store's availability "+ + "because the current time has not recovery or wait mppStoreFailTTL", + zap.String("store address", t.address), + zap.Time("recovery time", t.recoveryTime), + zap.Duration("MPPStoreFailTTL", recoveryTTL)) return false } func (t MPPFailedStoreProbe) scan(ctx context.Context) { defer func() { if r := recover(); r != nil { - logutil.Logger(ctx).Warn("mpp failed store probe scan error", zap.Any("recover", r), zap.Stack("stack")) + logutil.Logger(ctx).Warn("mpp failed store probe scan error,will restart", zap.Any("recover", r), zap.Stack("stack")) } }() do := func(k, v any) { address := fmt.Sprintln(k) - state, ok := v.(MPPSotreState) + state, ok := v.(*MPPSotreState) if !ok { - logutil.BgLogger().Warn("MPPSotreState struct Deserialization exception", + logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", zap.String("address", address), zap.Any("state", v)) + t.failedMPPStores.Delete(address) return } @@ -127,28 +134,28 @@ func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvS address: address, store: store, } - v, ok := t.failedMPPStores.LoadOrStore(address, state) - if !ok { - logutil.Logger(ctx).Debug("failed store repeat add", zap.String("address", address), zap.Any("state", v)) - } - + logutil.Logger(ctx).Debug("add mpp store to failed list", zap.String("address", address)) + t.failedMPPStores.Store(address, &state) } // IsRecovery check whether the store is recovery func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { + logutil.Logger(ctx).Debug("check failed store recovery", zap.String("address", address), zap.Duration("ttl", recoveryTTL)) v, ok := t.failedMPPStores.Load(address) if !ok { - // store not failed map + // store not in failed map return true } - state, ok := v.(MPPSotreState) + state, ok := v.(*MPPSotreState) if !ok { - logutil.Logger(ctx).Warn("MPPSotreState struct Deserialization exception", + logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", zap.String("address", address), zap.Any("state", v)) + t.failedMPPStores.Delete(address) return false } + return state.isRecovery(ctx, recoveryTTL) } @@ -157,7 +164,7 @@ func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, rec func (t *MPPFailedStoreProbe) Run() { for { t.scan(context.Background()) - time.Sleep(DetectPeriod) + time.Sleep(time.Second) } } @@ -185,4 +192,6 @@ func init() { globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, } + // run a background probe process + go globalMPPFailedStoreProbe.Run() } From b86ec48fc291897b2af9a2126080f085cd68abc5 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Thu, 15 Dec 2022 19:22:17 +0800 Subject: [PATCH 04/19] add metrics and fix unit test. --- metrics/metrics.go | 2 ++ metrics/server.go | 8 ++++++++ store/copr/mpp_probe.go | 45 +++++++++++++++++++++++++++++------------ 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 2984b66ddb27c..70eff78bc4318 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -182,6 +182,7 @@ func RegisterMetrics() { prometheus.MustRegister(TokenGauge) prometheus.MustRegister(ConfigStatus) prometheus.MustRegister(TiFlashQueryTotalCounter) + prometheus.MustRegister(TiFlashFailedMPPStoreState) prometheus.MustRegister(SmallTxnWriteDuration) prometheus.MustRegister(TxnWriteThroughput) prometheus.MustRegister(LoadSysVarCacheCounter) @@ -236,6 +237,7 @@ func ToggleSimplifiedMode(simplified bool) { InfoCacheCounters, ReadFromTableCacheCounter, TiFlashQueryTotalCounter, + TiFlashFailedMPPStoreState, CampaignOwnerCounter, NonTransactionalDMLCount, MemoryUsage, diff --git a/metrics/server.go b/metrics/server.go index 116b02eb122b6..9425bbf94e960 100644 --- a/metrics/server.go +++ b/metrics/server.go @@ -279,6 +279,14 @@ var ( Help: "Counter of TiFlash queries.", }, []string{LblType, LblResult}) + TiFlashFailedMPPStoreState = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "tiflash_failed_mpp_store", + Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.", + }, []string{LblAddress}) + PDAPIExecutionHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "tidb", diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 78150a2de87b8..a5fb4e8335cca 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/mpp" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/logutil" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -51,6 +52,7 @@ type MPPSotreState struct { // MPPFailedStoreProbe use for detecting of failed TiFlash instance type MPPFailedStoreProbe struct { failedMPPStores *sync.Map + lock *sync.Mutex } func (t *MPPSotreState) detect(ctx context.Context) { @@ -64,9 +66,10 @@ func (t *MPPSotreState) detect(ctx context.Context) { } defer func() { t.lastDetectTime = time.Now() }() - + metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0) err := detectMPPStore(ctx, t.store.GetTiKVClient(), t.address) if err != nil { + metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1) t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. return } @@ -105,7 +108,7 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", zap.String("address", address), zap.Any("state", v)) - t.failedMPPStores.Delete(address) + t.Delete(address) return } @@ -113,10 +116,10 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { // clean restored store if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > MaxRecoveryTimeLimit { - t.failedMPPStores.Delete(address) + t.Delete(address) // clean store that may be obsolete } else if !state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > MaxObsoletTimeLimit { - t.failedMPPStores.Delete(address) + t.Delete(address) } } @@ -129,7 +132,10 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { } // Add add a store when sync probe failed -func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) { +func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) { + // run a background probe process,if not start + globalMPPFailedStoreProbe.Run() + state := MPPSotreState{ address: address, store: store, @@ -139,8 +145,9 @@ func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvS } // IsRecovery check whether the store is recovery -func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { - logutil.Logger(ctx).Debug("check failed store recovery", zap.String("address", address), zap.Duration("ttl", recoveryTTL)) +func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { + logutil.Logger(ctx).Debug("check failed store recovery", + zap.String("address", address), zap.Duration("ttl", recoveryTTL)) v, ok := t.failedMPPStores.Load(address) if !ok { // store not in failed map @@ -152,7 +159,7 @@ func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, rec logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", zap.String("address", address), zap.Any("state", v)) - t.failedMPPStores.Delete(address) + t.Delete(address) return false } @@ -162,10 +169,23 @@ func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, rec // Run a loop of scan // there can be only one background task func (t *MPPFailedStoreProbe) Run() { - for { - t.scan(context.Background()) - time.Sleep(time.Second) + if !t.lock.TryLock() { + return } + go func() { + defer t.lock.Unlock() + for { + metrics.TiFlashFailedMPPStoreState.WithLabelValues("probe").Set(-1) //probe heartbeat + t.scan(context.Background()) + time.Sleep(time.Second) + } + }() + +} + +func (t *MPPFailedStoreProbe) Delete(address string) { + metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) + t.failedMPPStores.Delete(address) } // MPPStore detect function @@ -191,7 +211,6 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string) err func init() { globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, + lock: &sync.Mutex{}, } - // run a background probe process - go globalMPPFailedStoreProbe.Run() } From d6c576f8533f28c32231f8d937abf081640f5564 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Fri, 16 Dec 2022 11:08:29 +0800 Subject: [PATCH 05/19] clean unse code for mppStoreLastFailTime --- go.mod | 2 +- kv/mpp.go | 3 +- planner/core/fragment.go | 2 +- sessionctx/sessionstates/session_states.go | 3 -- .../sessionstates/session_states_test.go | 19 ----------- sessionctx/variable/session.go | 13 -------- store/copr/batch_coprocessor.go | 32 +++++++++---------- store/copr/batch_coprocessor_test.go | 4 +-- store/copr/mpp.go | 6 ++-- store/copr/mpp_probe.go | 15 +++++---- 10 files changed, 32 insertions(+), 67 deletions(-) diff --git a/go.mod b/go.mod index ca9c46da3977b..eebe7e139ec85 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( cloud.google.com/go/storage v1.21.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0 github.com/BurntSushi/toml v1.2.1 @@ -129,7 +130,6 @@ require ( cloud.google.com/go v0.100.2 // indirect cloud.google.com/go/compute v1.5.0 // indirect cloud.google.com/go/iam v0.1.1 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect diff --git a/kv/mpp.go b/kv/mpp.go index b0752f8186deb..14c6f4c5f9b5b 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -16,7 +16,6 @@ package kv import ( "context" - "sync" "time" "github.com/pingcap/kvproto/pkg/mpp" @@ -81,7 +80,7 @@ type MPPDispatchRequest struct { type MPPClient interface { // ConstructMPPTasks schedules task for a plan fragment. // TODO:: This interface will be refined after we support more executors. - ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, *sync.Map, time.Duration) ([]MPPTaskMeta, error) + ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, bool, time.Duration) ([]MPPTaskMeta, error) // DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response diff --git a/planner/core/fragment.go b/planner/core/fragment.go index c6aec17f21e6d..5c26009d4b2f9 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -378,7 +378,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err)) ttl = 30 * time.Second } - metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl) + metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, true, ttl) if err != nil { return nil, errors.Trace(err) } diff --git a/sessionctx/sessionstates/session_states.go b/sessionctx/sessionstates/session_states.go index 36ea0b22455d7..c9e1652a9c1df 100644 --- a/sessionctx/sessionstates/session_states.go +++ b/sessionctx/sessionstates/session_states.go @@ -15,8 +15,6 @@ package sessionstates import ( - "time" - "github.com/pingcap/tidb/errno" ptypes "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -79,7 +77,6 @@ type SessionStates struct { FoundInPlanCache bool `json:"in-plan-cache,omitempty"` FoundInBinding bool `json:"in-binding,omitempty"` SequenceLatestValues map[int64]int64 `json:"seq-values,omitempty"` - MPPStoreLastFailTime map[string]time.Time `json:"store-fail-time,omitempty"` LastAffectedRows int64 `json:"affected-rows,omitempty"` LastInsertID uint64 `json:"last-insert-id,omitempty"` Warnings []stmtctx.SQLWarn `json:"warnings,omitempty"` diff --git a/sessionctx/sessionstates/session_states_test.go b/sessionctx/sessionstates/session_states_test.go index 5910d6b18e071..d0b2b5e9ec83f 100644 --- a/sessionctx/sessionstates/session_states_test.go +++ b/sessionctx/sessionstates/session_states_test.go @@ -20,9 +20,7 @@ import ( "fmt" "strconv" "strings" - "sync" "testing" - "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/config" @@ -378,23 +376,6 @@ func TestSessionCtx(t *testing.T) { tk.MustQuery("select nextval(test.s)").Check(testkit.Rows("2")) }, }, - { - // check MPPStoreLastFailTime - setFunc: func(tk *testkit.TestKit) any { - m := sync.Map{} - m.Store("store1", time.Now()) - tk.Session().GetSessionVars().MPPStoreLastFailTime = &m - return tk.Session().GetSessionVars().MPPStoreLastFailTime - }, - checkFunc: func(tk *testkit.TestKit, param any) { - failTime := tk.Session().GetSessionVars().MPPStoreLastFailTime - tm, ok := failTime.Load("store1") - require.True(t, ok) - v, ok := (param.(*sync.Map)).Load("store1") - require.True(t, ok) - require.True(t, tm.(time.Time).Equal(v.(time.Time))) - }, - }, { // check FoundInPlanCache setFunc: func(tk *testkit.TestKit) any { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index ad60ccba2cfa4..8aff987a702f0 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1170,9 +1170,6 @@ type SessionVars struct { // TemporaryTableData stores committed kv values for temporary table for current session. TemporaryTableData TemporaryTableData - // MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed. It maps store address(string) to fail time(time.Time). - MPPStoreLastFailTime *sync.Map - // MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash. MPPStoreFailTTL string @@ -1666,7 +1663,6 @@ func NewSessionVars(hctx HookContext) *SessionVars { AllowFallbackToTiKV: make(map[kv.StoreType]struct{}), CTEMaxRecursionDepth: DefCTEMaxRecursionDepth, TMPTableSize: DefTiDBTmpTableMaxSize, - MPPStoreLastFailTime: new(sync.Map), MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, Rng: mathutil.NewWithTime(), StatsLoadSyncWait: StatsLoadSyncWait.Load(), @@ -2293,12 +2289,6 @@ func (s *SessionVars) EncodeSessionStates(ctx context.Context, sessionStates *se } sessionStates.LastFoundRows = s.LastFoundRows sessionStates.SequenceLatestValues = s.SequenceState.GetAllStates() - sessionStates.MPPStoreLastFailTime = make(map[string]time.Time, 0) - s.MPPStoreLastFailTime.Range( - func(key, value interface{}) bool { - sessionStates.MPPStoreLastFailTime[key.(string)] = value.(time.Time) - return true - }) sessionStates.FoundInPlanCache = s.PrevFoundInPlanCache sessionStates.FoundInBinding = s.PrevFoundInBinding @@ -2334,9 +2324,6 @@ func (s *SessionVars) DecodeSessionStates(ctx context.Context, sessionStates *se } s.LastFoundRows = sessionStates.LastFoundRows s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues) - for k, v := range sessionStates.MPPStoreLastFailTime { - s.MPPStoreLastFailTime.Store(k, v) - } s.FoundInPlanCache = sessionStates.FoundInPlanCache s.FoundInBinding = sessionStates.FoundInBinding diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index f913fcc9271fa..3faa400a44dc9 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -294,12 +294,11 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca // // The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely). // If balanceWithContinuity is true, the second balance strategy is enable. -func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { +func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { if len(originalTasks) == 0 { log.Info("Batch cop task balancer got an empty task set.") return originalTasks } - isMPP := mppStoreLastFailTime != nil // for mpp, we still need to detect the store availability if len(originalTasks) <= 1 && !isMPP { return originalTasks @@ -508,29 +507,29 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, - mppStoreLastFailTime *sync.Map, + isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } - return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, - mppStoreLastFailTime *sync.Map, + isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } else { - batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } if err != nil { return nil, err @@ -540,8 +539,8 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, return batchTasks, nil } -func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { - batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) +func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { + batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) if err != nil { return nil, err } @@ -588,7 +587,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, ran // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. // At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. // Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table. -func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { +func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { cache := store.GetRegionCache() start := time.Now() const cmdType = tikvrpc.CmdBatchCop @@ -618,7 +617,6 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach storeTaskMap := make(map[string]*batchCopTask) needRetry := false - isMPP := mppStoreLastFailTime != nil for _, task := range tasks { rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP) if err != nil { @@ -670,7 +668,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach logutil.BgLogger().Debug(msg) } balanceStart := time.Now() - batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) balanceElapsed := time.Since(balanceStart) if log.GetLevel() <= zap.DebugLevel { msg := "After region balance:" @@ -736,11 +734,11 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V keyRanges = append(keyRanges, NewKeyRanges(pi.KeyRanges)) partitionIDs = append(partitionIDs, pi.ID) } - tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs) + tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, false, 0, false, 0, partitionIDs) } else { // TODO: merge the if branch. ranges := NewKeyRanges(req.KeyRanges.FirstPartitionRange()) - tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0) + tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, false, 0, false, 0) } if err != nil { @@ -887,7 +885,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba ranges = append(ranges, *ran) }) } - ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0, false, 0) + ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false, 0, false, 0) return ret, err } // Retry Partition Table Scan @@ -906,7 +904,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba } keyRanges = append(keyRanges, NewKeyRanges(ranges)) } - ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, nil, 0, false, 0, pid) + ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, false, 0, false, 0, pid) return ret, err } diff --git a/store/copr/batch_coprocessor_test.go b/store/copr/batch_coprocessor_test.go index 5616f61c54365..3e10ce627b1f6 100644 --- a/store/copr/batch_coprocessor_test.go +++ b/store/copr/batch_coprocessor_test.go @@ -120,13 +120,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) { func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) { { var nilTaskSet []*batchCopTask - nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, nil, time.Second, false, 0) + nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0) require.True(t, nilResult == nil) } { emptyTaskSet := make([]*batchCopTask, 0) - emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, nil, time.Second, false, 0) + emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0) require.True(t, emptyResult != nil) require.True(t, len(emptyResult) == 0) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 37d5629ac6f3d..cfbe6fb9abe00 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -62,7 +62,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { } // ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns. -func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, mppStoreLastFailTime *sync.Map, ttl time.Duration) ([]kv.MPPTaskMeta, error) { +func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, isMPP bool, ttl time.Duration) ([]kv.MPPTaskMeta, error) { ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) var tasks []*batchCopTask @@ -74,13 +74,13 @@ func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasks rangesForEachPartition[i] = NewKeyRanges(p.KeyRanges) partitionIDs[i] = p.ID } - tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, mppStoreLastFailTime, ttl, true, 20, partitionIDs) + tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, isMPP, ttl, true, 20, partitionIDs) } else { if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil } ranges := NewKeyRanges(req.KeyRanges) - tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, mppStoreLastFailTime, ttl, true, 20) + tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, isMPP, ttl, true, 20) } if err != nil { diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index a5fb4e8335cca..9b362a7c60ab0 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -32,10 +32,14 @@ import ( var globalMPPFailedStoreProbe *MPPFailedStoreProbe const ( - DetectTimeoutLimit = 2 * time.Second - DetectPeriod = 3 * time.Second - MaxRecoveryTimeLimit = 15 * time.Minute // wait TiFlash recovery,more than MPPStoreFailTTL - MaxObsoletTimeLimit = 24 * time.Hour // no request for a long time,that might be obsoleted + // DetectTimeoutLimit detect timeout + DetectTimeoutLimit = 2 * time.Second + // DetectPeriod detect period + DetectPeriod = 3 * time.Second + // MaxRecoveryTimeLimit wait TiFlash recovery,more than MPPStoreFailTTL + MaxRecoveryTimeLimit = 15 * time.Minute + // MaxObsoletTimeLimit no request for a long time,that might be obsoleted + MaxObsoletTimeLimit = 24 * time.Hour ) // MPPSotreState the state for MPPStore. @@ -78,7 +82,6 @@ func (t *MPPSotreState) detect(ctx context.Context) { if t.recoveryTime.IsZero() { t.recoveryTime = time.Now() } - } func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { @@ -180,9 +183,9 @@ func (t *MPPFailedStoreProbe) Run() { time.Sleep(time.Second) } }() - } +// Delete clean store from failed map func (t *MPPFailedStoreProbe) Delete(address string) { metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) t.failedMPPStores.Delete(address) From aafaba409eda74abd0e5b0439a2346e75bd394fd Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Fri, 16 Dec 2022 18:51:26 +0800 Subject: [PATCH 06/19] add unit test and fix some bugs --- store/copr/batch_coprocessor.go | 5 +- store/copr/mpp_probe.go | 57 ++++++----- store/copr/mpp_probe_test.go | 164 ++++++++++++++++++++++++++++++++ store/copr/store.go | 2 + 4 files changed, 203 insertions(+), 25 deletions(-) create mode 100644 store/copr/mpp_probe_test.go diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 3faa400a44dc9..c7158d29dec42 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -340,9 +340,10 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] return } - err := detectMPPStore(ctx, kvStore.GetTiKVClient(), s.GetAddr()) + tikvClient := kvStore.GetTiKVClient() + err := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) if err != nil { - globalMPPFailedStoreProbe.Add(ctx, s.GetAddr(), kvStore) + globalMPPFailedStoreProbe.Add(ctx, s.GetAddr(), tikvClient) return } diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 9b362a7c60ab0..6a7631b94ed7a 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -32,10 +32,10 @@ import ( var globalMPPFailedStoreProbe *MPPFailedStoreProbe const ( - // DetectTimeoutLimit detect timeout - DetectTimeoutLimit = 2 * time.Second // DetectPeriod detect period DetectPeriod = 3 * time.Second + // DetectTimeoutLimit detect timeout + DetectTimeoutLimit = 2 * time.Second // MaxRecoveryTimeLimit wait TiFlash recovery,more than MPPStoreFailTTL MaxRecoveryTimeLimit = 15 * time.Minute // MaxObsoletTimeLimit no request for a long time,that might be obsoleted @@ -44,9 +44,10 @@ const ( // MPPSotreState the state for MPPStore. type MPPSotreState struct { - address string // MPPStore TiFlash address - store *kvStore - lock sync.Mutex + address string // MPPStore TiFlash address + tikvClient tikv.Client + + lock sync.Mutex recoveryTime time.Time lastLookupTime time.Time @@ -57,21 +58,26 @@ type MPPSotreState struct { type MPPFailedStoreProbe struct { failedMPPStores *sync.Map lock *sync.Mutex + + detectPeriod time.Duration + detectTimeoutLimit time.Duration + maxRecoveryTimeLimit time.Duration + maxObsoletTimeLimit time.Duration } -func (t *MPPSotreState) detect(ctx context.Context) { +func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) { if !t.lock.TryLock() { return } defer t.lock.Unlock() - if time.Since(t.lastDetectTime) < DetectPeriod { + if time.Since(t.lastDetectTime) < detectPeriod { return } defer func() { t.lastDetectTime = time.Now() }() metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0) - err := detectMPPStore(ctx, t.store.GetTiKVClient(), t.address) + err := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit) if err != nil { metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1) t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. @@ -105,7 +111,7 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { }() do := func(k, v any) { - address := fmt.Sprintln(k) + address := fmt.Sprint(k) state, ok := v.(*MPPSotreState) if !ok { logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", @@ -115,13 +121,13 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { return } - state.detect(ctx) + state.detect(ctx, t.detectPeriod, t.detectTimeoutLimit) // clean restored store - if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > MaxRecoveryTimeLimit { + if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > t.maxRecoveryTimeLimit { t.Delete(address) // clean store that may be obsolete - } else if !state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > MaxObsoletTimeLimit { + } else if state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > t.maxObsoletTimeLimit { t.Delete(address) } } @@ -135,13 +141,11 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { } // Add add a store when sync probe failed -func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) { - // run a background probe process,if not start - globalMPPFailedStoreProbe.Run() - +func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, tikvClient tikv.Client) { state := MPPSotreState{ - address: address, - store: store, + address: address, + tikvClient: tikvClient, + lastLookupTime: time.Now(), } logutil.Logger(ctx).Debug("add mpp store to failed list", zap.String("address", address)) t.failedMPPStores.Store(address, &state) @@ -188,17 +192,20 @@ func (t *MPPFailedStoreProbe) Run() { // Delete clean store from failed map func (t *MPPFailedStoreProbe) Delete(address string) { metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) - t.failedMPPStores.Delete(address) + _, ok := t.failedMPPStores.LoadAndDelete(address) + if !ok { + logutil.BgLogger().Warn("Store is deleted", zap.String("address", address), zap.Any("isok", ok)) + } } // MPPStore detect function -func detectMPPStore(ctx context.Context, client tikv.Client, address string) error { +func detectMPPStore(ctx context.Context, client tikv.Client, address string, detectTimeoutLimit time.Duration) error { resp, err := client.SendRequest(ctx, address, &tikvrpc.Request{ Type: tikvrpc.CmdMPPAlive, StoreTp: tikvrpc.TiFlash, Req: &mpp.IsAliveRequest{}, Context: kvrpcpb.Context{}, - }, DetectTimeoutLimit) + }, detectTimeoutLimit) if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { if err == nil { err = fmt.Errorf("store not ready to serve") @@ -213,7 +220,11 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string) err func init() { globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ - failedMPPStores: &sync.Map{}, - lock: &sync.Mutex{}, + failedMPPStores: &sync.Map{}, + lock: &sync.Mutex{}, + detectPeriod: DetectPeriod, + detectTimeoutLimit: DetectTimeoutLimit, + maxRecoveryTimeLimit: MaxRecoveryTimeLimit, + maxObsoletTimeLimit: MaxObsoletTimeLimit, } } diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go new file mode 100644 index 0000000000000..8ee42571f094e --- /dev/null +++ b/store/copr/mpp_probe_test.go @@ -0,0 +1,164 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package copr + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/mpp" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikvrpc" +) + +const ( + testimeout = "timeout" + Error = "error" + Normal = "normal" +) + +type mockDetectClient struct { + errortestype string +} + +func (t *mockDetectClient) CloseAddr(string) error { + return nil +} + +func (t *mockDetectClient) Close() error { + return nil +} + +func (t *mockDetectClient) SendRequest( + ctx context.Context, + addr string, + req *tikvrpc.Request, + timeout time.Duration, +) (*tikvrpc.Response, error) { + + if t.errortestype == Error { + return nil, errors.New("store error") + } else if t.errortestype == testimeout { + return &tikvrpc.Response{Resp: &mpp.IsAliveResponse{}}, nil + } + + return &tikvrpc.Response{Resp: &mpp.IsAliveResponse{Available: true}}, nil + +} + +type ProbeTest map[string]*mockDetectClient + +func (t ProbeTest) add(ctx context.Context) { + for k, v := range t { + globalMPPFailedStoreProbe.Add(ctx, k, v) + } +} + +func (t ProbeTest) reSetErrortestype(to string) { + for k, v := range t { + if to == Normal { + v.errortestype = Normal + } else { + v.errortestype = k + } + } +} + +func (t ProbeTest) judge(ctx context.Context, test *testing.T, recoverytesttestL time.Duration, need bool) { + for k := range t { + ok := globalMPPFailedStoreProbe.IsRecovery(ctx, k, recoverytesttestL) + require.Equal(test, need, ok) + } +} + +func failedStoreSizeJudge(ctx context.Context, test *testing.T, need int) { + var l int + globalMPPFailedStoreProbe.scan(ctx) + time.Sleep(time.Second / 10) + globalMPPFailedStoreProbe.failedMPPStores.Range(func(k, v interface{}) bool { + l++ + return true + }) + require.Equal(test, need, l) +} + +func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow []string) { + probetestest.add(ctx) + for _, to := range flow { + probetestest.reSetErrortestype(to) + + globalMPPFailedStoreProbe.scan(ctx) + time.Sleep(time.Second / 10) //wait detect goroutine finish + + var need bool + if to == Normal { + need = true + } + probetestest.judge(ctx, test, 0, need) + probetestest.judge(ctx, test, time.Minute, false) + } + lastTo := flow[len(flow)-1] + cleanRecover := func(need int) { + globalMPPFailedStoreProbe.maxRecoveryTimeLimit = 0 - time.Second + failedStoreSizeJudge(ctx, test, need) + globalMPPFailedStoreProbe.maxRecoveryTimeLimit = MaxRecoveryTimeLimit + } + + cleanObsolet := func(need int) { + globalMPPFailedStoreProbe.maxObsoletTimeLimit = 0 - time.Second + failedStoreSizeJudge(ctx, test, need) + globalMPPFailedStoreProbe.maxObsoletTimeLimit = MaxObsoletTimeLimit + } + + if lastTo == Error { + cleanRecover(2) + cleanObsolet(0) + } else if lastTo == Normal { + cleanObsolet(2) + cleanRecover(0) + } + +} + +func TestMPPFailedStoreProbe(t *testing.T) { + ctx := context.Background() + + notExistAddress := "not exist address" + + globalMPPFailedStoreProbe.detectPeriod = 0 - time.Second + + // Confirm that multiple tasks are not allowed + globalMPPFailedStoreProbe.lock.Lock() + globalMPPFailedStoreProbe.Run() + + // check not exist address + ok := globalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) + require.True(t, ok) + + globalMPPFailedStoreProbe.scan(ctx) + + probetestest := map[string]*mockDetectClient{ + testimeout: {errortestype: testimeout}, + Error: {errortestype: Error}, + } + + testFlowFinallyRecover := []string{Error, Normal, Error, Error, Normal} + testFlow(ctx, probetestest, t, testFlowFinallyRecover) + testFlowFinallyDesert := []string{Error, Normal, Normal, Error, Error} + testFlow(ctx, probetestest, t, testFlowFinallyDesert) + +} diff --git a/store/copr/store.go b/store/copr/store.go index 758109b81d805..77728f474d684 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -113,6 +113,8 @@ func (s *Store) GetClient() kv.Client { // GetMPPClient gets a mpp client instance. func (s *Store) GetMPPClient() kv.MPPClient { + // run a background probe process,if not start + globalMPPFailedStoreProbe.Run() return &MPPClient{ store: s.kvStore, } From 0c9ff8e193044b16819dc999a83e3a024e353de3 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Mon, 19 Dec 2022 22:34:50 +0800 Subject: [PATCH 07/19] use store manage background goroutine --- store/copr/mpp_probe.go | 14 +++++++++++++- store/copr/mpp_probe_test.go | 3 +-- store/copr/store.go | 7 +++++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 8456972a44144..431959424bd97 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -58,6 +58,8 @@ type MPPSotreState struct { type MPPFailedStoreProbe struct { failedMPPStores *sync.Map lock *sync.Mutex + ctx context.Context + cancel context.CancelFunc detectPeriod time.Duration detectTimeoutLimit time.Duration @@ -176,10 +178,11 @@ func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, re // Run a loop of scan // there can be only one background task -func (t *MPPFailedStoreProbe) Run() { +func (t *MPPFailedStoreProbe) run() { if !t.lock.TryLock() { return } + go func() { defer t.lock.Unlock() ticker := time.NewTicker(time.Second) @@ -187,6 +190,8 @@ func (t *MPPFailedStoreProbe) Run() { for { select { + case <-t.ctx.Done(): + return case <-ticker.C: t.scan(context.Background()) } @@ -194,6 +199,10 @@ func (t *MPPFailedStoreProbe) Run() { }() } +func (t *MPPFailedStoreProbe) stop() { + t.cancel() +} + // Delete clean store from failed map func (t *MPPFailedStoreProbe) Delete(address string) { metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) @@ -224,9 +233,12 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string, det } func init() { + ctx, cancel := context.WithCancel(context.Background()) globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, lock: &sync.Mutex{}, + ctx: ctx, + cancel: cancel, detectPeriod: DetectPeriod, detectTimeoutLimit: DetectTimeoutLimit, maxRecoveryTimeLimit: MaxRecoveryTimeLimit, diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go index 55431e1b6c02f..fb31fc423dd60 100644 --- a/store/copr/mpp_probe_test.go +++ b/store/copr/mpp_probe_test.go @@ -56,7 +56,6 @@ func (t *mockDetectClient) SendRequest( } return &tikvrpc.Response{Resp: &mpp.IsAliveResponse{Available: true}}, nil - } type ProbeTest map[string]*mockDetectClient @@ -141,7 +140,7 @@ func TestMPPFailedStoreProbe(t *testing.T) { // Confirm that multiple tasks are not allowed globalMPPFailedStoreProbe.lock.Lock() - globalMPPFailedStoreProbe.Run() + globalMPPFailedStoreProbe.run() // check not exist address ok := globalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) diff --git a/store/copr/store.go b/store/copr/store.go index 77728f474d684..e869164eb40d3 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -84,6 +84,10 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store if err != nil { return nil, errors.Trace(err) } + + // run a background probe process for mpp store + globalMPPFailedStoreProbe.run() + /* #nosec G404 */ return &Store{ kvStore: &kvStore{store: s}, @@ -95,6 +99,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store // Close releases resources allocated for coprocessor. func (s *Store) Close() { if s.coprCache != nil { + globalMPPFailedStoreProbe.stop() s.coprCache.cache.Close() } } @@ -113,8 +118,6 @@ func (s *Store) GetClient() kv.Client { // GetMPPClient gets a mpp client instance. func (s *Store) GetMPPClient() kv.MPPClient { - // run a background probe process,if not start - globalMPPFailedStoreProbe.Run() return &MPPClient{ store: s.kvStore, } From 7d2705dc28793ca8b28c79acb2ea317fecf54246 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 10:17:44 +0800 Subject: [PATCH 08/19] fix DATA RACE --- store/copr/mpp_probe.go | 26 ++++++++++---------------- store/copr/mpp_probe_test.go | 7 ++++--- store/copr/store.go | 17 ++++++++++++----- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 431959424bd97..5fcb42e4860aa 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -58,8 +58,6 @@ type MPPSotreState struct { type MPPFailedStoreProbe struct { failedMPPStores *sync.Map lock *sync.Mutex - ctx context.Context - cancel context.CancelFunc detectPeriod time.Duration detectTimeoutLimit time.Duration @@ -68,11 +66,6 @@ type MPPFailedStoreProbe struct { } func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) { - if !t.lock.TryLock() { - return - } - defer t.lock.Unlock() - if time.Since(t.lastDetectTime) < detectPeriod { return } @@ -93,6 +86,9 @@ func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, } func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { + t.lock.Lock() + defer t.lock.Unlock() + t.lastLookupTime = time.Now() if !t.recoveryTime.IsZero() && time.Since(t.recoveryTime) > recoveryTTL { return true @@ -123,6 +119,11 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { return } + if !state.lock.TryLock() { + return + } + defer state.lock.Unlock() + state.detect(ctx, t.detectPeriod, t.detectTimeoutLimit) // clean restored store @@ -178,7 +179,7 @@ func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, re // Run a loop of scan // there can be only one background task -func (t *MPPFailedStoreProbe) run() { +func (t *MPPFailedStoreProbe) run(ctx context.Context) { if !t.lock.TryLock() { return } @@ -190,7 +191,7 @@ func (t *MPPFailedStoreProbe) run() { for { select { - case <-t.ctx.Done(): + case <-ctx.Done(): return case <-ticker.C: t.scan(context.Background()) @@ -199,10 +200,6 @@ func (t *MPPFailedStoreProbe) run() { }() } -func (t *MPPFailedStoreProbe) stop() { - t.cancel() -} - // Delete clean store from failed map func (t *MPPFailedStoreProbe) Delete(address string) { metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) @@ -233,12 +230,9 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string, det } func init() { - ctx, cancel := context.WithCancel(context.Background()) globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, lock: &sync.Mutex{}, - ctx: ctx, - cancel: cancel, detectPeriod: DetectPeriod, detectTimeoutLimit: DetectTimeoutLimit, maxRecoveryTimeLimit: MaxRecoveryTimeLimit, diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go index fb31fc423dd60..ca363218f0eac 100644 --- a/store/copr/mpp_probe_test.go +++ b/store/copr/mpp_probe_test.go @@ -76,9 +76,9 @@ func (t ProbeTest) reSetErrortestype(to string) { } } -func (t ProbeTest) judge(ctx context.Context, test *testing.T, recoverytesttestL time.Duration, need bool) { +func (t ProbeTest) judge(ctx context.Context, test *testing.T, recoveryTTL time.Duration, need bool) { for k := range t { - ok := globalMPPFailedStoreProbe.IsRecovery(ctx, k, recoverytesttestL) + ok := globalMPPFailedStoreProbe.IsRecovery(ctx, k, recoveryTTL) require.Equal(test, need, ok) } } @@ -109,6 +109,7 @@ func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow probetestest.judge(ctx, test, 0, need) probetestest.judge(ctx, test, time.Minute, false) } + lastTo := flow[len(flow)-1] cleanRecover := func(need int) { globalMPPFailedStoreProbe.maxRecoveryTimeLimit = 0 - time.Second @@ -140,7 +141,7 @@ func TestMPPFailedStoreProbe(t *testing.T) { // Confirm that multiple tasks are not allowed globalMPPFailedStoreProbe.lock.Lock() - globalMPPFailedStoreProbe.run() + globalMPPFailedStoreProbe.run(ctx) // check not exist address ok := globalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) diff --git a/store/copr/store.go b/store/copr/store.go index e869164eb40d3..e0b2bebc6a1d0 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -17,6 +17,7 @@ package copr import ( "context" "math/rand" + "runtime" "sync/atomic" "time" @@ -85,21 +86,27 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store return nil, errors.Trace(err) } + ctx, cancel := context.WithCancel(context.Background()) // run a background probe process for mpp store - globalMPPFailedStoreProbe.run() + globalMPPFailedStoreProbe.run(ctx) - /* #nosec G404 */ - return &Store{ + store := &Store{ kvStore: &kvStore{store: s}, coprCache: coprCache, replicaReadSeed: rand.Uint32(), - }, nil + } + + runtime.SetFinalizer(store, func(s *Store) { + cancel() + }) + + /* #nosec G404 */ + return store, nil } // Close releases resources allocated for coprocessor. func (s *Store) Close() { if s.coprCache != nil { - globalMPPFailedStoreProbe.stop() s.coprCache.cache.Close() } } From 3864a7e24e3ed9df6130cdc4adaa4086f5b6e33c Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 17:24:29 +0800 Subject: [PATCH 09/19] stop background goroutine when store close --- store/copr/mpp_probe.go | 16 ++++++++++++++-- store/copr/mpp_probe_test.go | 2 +- store/copr/store.go | 19 ++++++------------- .../sessiontest/session_fail_test.go | 1 - 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 5fcb42e4860aa..ccbafa1b40593 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -58,6 +58,8 @@ type MPPSotreState struct { type MPPFailedStoreProbe struct { failedMPPStores *sync.Map lock *sync.Mutex + ctx context.Context + cancel context.CancelFunc detectPeriod time.Duration detectTimeoutLimit time.Duration @@ -179,7 +181,7 @@ func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, re // Run a loop of scan // there can be only one background task -func (t *MPPFailedStoreProbe) run(ctx context.Context) { +func (t *MPPFailedStoreProbe) run() { if !t.lock.TryLock() { return } @@ -191,7 +193,8 @@ func (t *MPPFailedStoreProbe) run(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-t.ctx.Done(): + logutil.BgLogger().Debug("ctx.done") return case <-ticker.C: t.scan(context.Background()) @@ -200,6 +203,12 @@ func (t *MPPFailedStoreProbe) run(ctx context.Context) { }() } +// Delete clean store from failed map +func (t *MPPFailedStoreProbe) stop() { + logutil.BgLogger().Debug("stop background task") + t.cancel() +} + // Delete clean store from failed map func (t *MPPFailedStoreProbe) Delete(address string) { metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) @@ -230,9 +239,12 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string, det } func init() { + ctx, cancel := context.WithCancel(context.Background()) globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, lock: &sync.Mutex{}, + ctx: ctx, + cancel: cancel, detectPeriod: DetectPeriod, detectTimeoutLimit: DetectTimeoutLimit, maxRecoveryTimeLimit: MaxRecoveryTimeLimit, diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go index ca363218f0eac..81f2ff698a6a6 100644 --- a/store/copr/mpp_probe_test.go +++ b/store/copr/mpp_probe_test.go @@ -141,7 +141,7 @@ func TestMPPFailedStoreProbe(t *testing.T) { // Confirm that multiple tasks are not allowed globalMPPFailedStoreProbe.lock.Lock() - globalMPPFailedStoreProbe.run(ctx) + globalMPPFailedStoreProbe.run() // check not exist address ok := globalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) diff --git a/store/copr/store.go b/store/copr/store.go index e0b2bebc6a1d0..e9112e7a4252e 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -17,7 +17,6 @@ package copr import ( "context" "math/rand" - "runtime" "sync/atomic" "time" @@ -86,27 +85,21 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store return nil, errors.Trace(err) } - ctx, cancel := context.WithCancel(context.Background()) - // run a background probe process for mpp store - globalMPPFailedStoreProbe.run(ctx) + // run a background probe process for mpp + globalMPPFailedStoreProbe.run() - store := &Store{ + /* #nosec G404 */ + return &Store{ kvStore: &kvStore{store: s}, coprCache: coprCache, replicaReadSeed: rand.Uint32(), - } - - runtime.SetFinalizer(store, func(s *Store) { - cancel() - }) - - /* #nosec G404 */ - return store, nil + }, nil } // Close releases resources allocated for coprocessor. func (s *Store) Close() { if s.coprCache != nil { + globalMPPFailedStoreProbe.stop() s.coprCache.cache.Close() } } diff --git a/tests/realtikvtest/sessiontest/session_fail_test.go b/tests/realtikvtest/sessiontest/session_fail_test.go index 919932e6f9357..a3df51be821c0 100644 --- a/tests/realtikvtest/sessiontest/session_fail_test.go +++ b/tests/realtikvtest/sessiontest/session_fail_test.go @@ -200,7 +200,6 @@ func TestAutoCommitNeedNotLinearizability(t *testing.T) { func TestKill(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) - tk := testkit.NewTestKit(t, store) tk.MustExec("kill connection_id();") } From 4275876aa8d54efa75a1474c63de1e512957ba5d Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 18:41:45 +0800 Subject: [PATCH 10/19] add log for store --- store/copr/mpp_probe.go | 2 +- store/copr/store.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index ccbafa1b40593..b03575d2d7433 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -194,7 +194,7 @@ func (t *MPPFailedStoreProbe) run() { for { select { case <-t.ctx.Done(): - logutil.BgLogger().Debug("ctx.done") + logutil.BgLogger().Info("ctx.done") return case <-ticker.C: t.scan(context.Background()) diff --git a/store/copr/store.go b/store/copr/store.go index e9112e7a4252e..9c84db6d6769c 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" + "github.com/pingcap/tidb/util/logutil" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -87,6 +88,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store // run a background probe process for mpp globalMPPFailedStoreProbe.run() + logutil.BgLogger().Info("run a background probe process for mpp") /* #nosec G404 */ return &Store{ From 1232ccb0812551376f62e2a3c42491e9c61b724e Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 20:23:41 +0800 Subject: [PATCH 11/19] fix goleak in unittest --- store/copr/mpp_probe.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index b03575d2d7433..038db1efb88af 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -58,6 +58,7 @@ type MPPSotreState struct { type MPPFailedStoreProbe struct { failedMPPStores *sync.Map lock *sync.Mutex + wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc @@ -186,6 +187,7 @@ func (t *MPPFailedStoreProbe) run() { return } + t.wg.Add(1) go func() { defer t.lock.Unlock() ticker := time.NewTicker(time.Second) @@ -195,6 +197,7 @@ func (t *MPPFailedStoreProbe) run() { select { case <-t.ctx.Done(): logutil.BgLogger().Info("ctx.done") + t.wg.Done() return case <-ticker.C: t.scan(context.Background()) @@ -205,8 +208,10 @@ func (t *MPPFailedStoreProbe) run() { // Delete clean store from failed map func (t *MPPFailedStoreProbe) stop() { - logutil.BgLogger().Debug("stop background task") + logutil.BgLogger().Info("stop background task") t.cancel() + t.wg.Wait() + } // Delete clean store from failed map @@ -245,6 +250,7 @@ func init() { lock: &sync.Mutex{}, ctx: ctx, cancel: cancel, + wg: &sync.WaitGroup{}, detectPeriod: DetectPeriod, detectTimeoutLimit: DetectTimeoutLimit, maxRecoveryTimeLimit: MaxRecoveryTimeLimit, From 1fe1b6f8adc5d6735cf2f5e87e9d1451950446f8 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 20:46:57 +0800 Subject: [PATCH 12/19] move wg.Done to defer --- store/copr/mpp_probe.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 038db1efb88af..ef3ea9a45cb86 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -192,12 +192,11 @@ func (t *MPPFailedStoreProbe) run() { defer t.lock.Unlock() ticker := time.NewTicker(time.Second) defer ticker.Stop() - + defer t.wg.Done() for { select { case <-t.ctx.Done(): logutil.BgLogger().Info("ctx.done") - t.wg.Done() return case <-ticker.C: t.scan(context.Background()) From 93ae0f3ad70da15428a87f9a8b2a58771d0f5d62 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 21:18:48 +0800 Subject: [PATCH 13/19] add logs --- store/copr/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/copr/store.go b/store/copr/store.go index 9c84db6d6769c..6027a4228500e 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -100,6 +100,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store // Close releases resources allocated for coprocessor. func (s *Store) Close() { + logutil.BgLogger().Info("store close") if s.coprCache != nil { globalMPPFailedStoreProbe.stop() s.coprCache.cache.Close() From b4347c688892eae6049c8a970c73a8d98426c31d Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 21:18:48 +0800 Subject: [PATCH 14/19] add logs --- store/copr/mpp_probe.go | 7 ++++++- store/copr/store.go | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index d1b6b303e24e5..adeb2265a4e67 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -61,6 +61,7 @@ type MPPFailedStoreProbe struct { wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc + isRun bool detectPeriod time.Duration detectTimeoutLimit time.Duration @@ -186,9 +187,10 @@ func (t *MPPFailedStoreProbe) run() { if !t.lock.TryLock() { return } - + t.isRun = true t.wg.Add(1) go func() { + defer func() { t.isRun = false }() defer t.wg.Done() defer t.lock.Unlock() ticker := time.NewTicker(time.Second) @@ -208,6 +210,9 @@ func (t *MPPFailedStoreProbe) run() { // Delete clean store from failed map func (t *MPPFailedStoreProbe) stop() { + if t.isRun == false { + return + } logutil.BgLogger().Info("stop background task") t.cancel() t.wg.Wait() diff --git a/store/copr/store.go b/store/copr/store.go index 9c84db6d6769c..ae166e97cb695 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -100,8 +100,9 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store // Close releases resources allocated for coprocessor. func (s *Store) Close() { + logutil.BgLogger().Info("store close") + globalMPPFailedStoreProbe.stop() if s.coprCache != nil { - globalMPPFailedStoreProbe.stop() s.coprCache.cache.Close() } } From 454028b1fc7342aeb4eec08d2b2aa454d4cff078 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Tue, 20 Dec 2022 21:40:51 +0800 Subject: [PATCH 15/19] fix unitest goleak --- store/copr/mpp_probe.go | 13 ++++++++----- store/copr/mpp_probe_test.go | 24 ++++++++++++++++++++---- store/copr/store.go | 4 +--- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index adeb2265a4e67..7d46b47d4013c 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -58,10 +59,10 @@ type MPPSotreState struct { type MPPFailedStoreProbe struct { failedMPPStores *sync.Map lock *sync.Mutex + isStop *atomic.Bool wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc - isRun bool detectPeriod time.Duration detectTimeoutLimit time.Duration @@ -187,10 +188,9 @@ func (t *MPPFailedStoreProbe) run() { if !t.lock.TryLock() { return } - t.isRun = true t.wg.Add(1) + t.isStop.Swap(false) go func() { - defer func() { t.isRun = false }() defer t.wg.Done() defer t.lock.Unlock() ticker := time.NewTicker(time.Second) @@ -202,7 +202,7 @@ func (t *MPPFailedStoreProbe) run() { logutil.BgLogger().Info("ctx.done") return case <-ticker.C: - t.scan(context.Background()) + t.scan(t.ctx) } } }() @@ -210,7 +210,7 @@ func (t *MPPFailedStoreProbe) run() { // Delete clean store from failed map func (t *MPPFailedStoreProbe) stop() { - if t.isRun == false { + if !t.isStop.CompareAndSwap(false, true) { return } logutil.BgLogger().Info("stop background task") @@ -249,9 +249,12 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string, det func init() { ctx, cancel := context.WithCancel(context.Background()) + isStop := atomic.Bool{} + isStop.Swap(true) globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, lock: &sync.Mutex{}, + isStop: &isStop, ctx: ctx, cancel: cancel, wg: &sync.WaitGroup{}, diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go index 81f2ff698a6a6..0e2d14fcf5ea5 100644 --- a/store/copr/mpp_probe_test.go +++ b/store/copr/mpp_probe_test.go @@ -139,10 +139,6 @@ func TestMPPFailedStoreProbe(t *testing.T) { globalMPPFailedStoreProbe.detectPeriod = 0 - time.Second - // Confirm that multiple tasks are not allowed - globalMPPFailedStoreProbe.lock.Lock() - globalMPPFailedStoreProbe.run() - // check not exist address ok := globalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) require.True(t, ok) @@ -159,3 +155,23 @@ func TestMPPFailedStoreProbe(t *testing.T) { testFlowFinallyDesert := []string{Error, Normal, Normal, Error, Error} testFlow(ctx, probetestest, t, testFlowFinallyDesert) } + +func TestMPPFailedStoreProbeGoroutineTask(t *testing.T) { + // Confirm that multiple tasks are not allowed + globalMPPFailedStoreProbe.lock.Lock() + globalMPPFailedStoreProbe.run() + globalMPPFailedStoreProbe.lock.Unlock() + + globalMPPFailedStoreProbe.run() + globalMPPFailedStoreProbe.stop() +} + +func TestMPPFailedStoreAssertFailed(t *testing.T) { + ctx := context.Background() + + globalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) + globalMPPFailedStoreProbe.scan(ctx) + + globalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) + globalMPPFailedStoreProbe.IsRecovery(ctx, "errorinfo", 0) +} diff --git a/store/copr/store.go b/store/copr/store.go index 40b3d43b8fb63..252f306978e47 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -101,10 +101,8 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store // Close releases resources allocated for coprocessor. func (s *Store) Close() { logutil.BgLogger().Info("store close") -<<<<<<< HEAD globalMPPFailedStoreProbe.stop() -======= ->>>>>>> 93ae0f3ad70da15428a87f9a8b2a58771d0f5d62 + if s.coprCache != nil { s.coprCache.cache.Close() } From d3856295ebf89d9fc7c6fb6f99299892142f5c22 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Wed, 21 Dec 2022 15:50:10 +0800 Subject: [PATCH 16/19] optimize lock for query --- store/copr/mpp_probe.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 7d46b47d4013c..01bb49ae5c650 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -91,7 +91,9 @@ func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, } func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { - t.lock.Lock() + if !t.lock.TryLock() { + return false + } defer t.lock.Unlock() t.lastLookupTime = time.Now() @@ -213,9 +215,9 @@ func (t *MPPFailedStoreProbe) stop() { if !t.isStop.CompareAndSwap(false, true) { return } - logutil.BgLogger().Info("stop background task") t.cancel() t.wg.Wait() + logutil.BgLogger().Info("stop background task") } // Delete clean store from failed map From 616973de64929ee94ec5bee501fc0887f41e6950 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Wed, 21 Dec 2022 18:22:35 +0800 Subject: [PATCH 17/19] move background goroutine to main --- store/copr/batch_coprocessor.go | 4 ++-- store/copr/mpp_probe.go | 22 ++++++++--------- store/copr/mpp_probe_test.go | 42 ++++++++++++++++----------------- store/copr/store.go | 8 ------- tidb-server/BUILD.bazel | 1 + tidb-server/main.go | 3 +++ 6 files changed, 38 insertions(+), 42 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index c7158d29dec42..2da522b895578 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -335,7 +335,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] s := stores[idx] // check if store is failed already. - ok := globalMPPFailedStoreProbe.IsRecovery(ctx, s.GetAddr(), ttl) + ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, s.GetAddr(), ttl) if !ok { return } @@ -343,7 +343,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] tikvClient := kvStore.GetTiKVClient() err := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) if err != nil { - globalMPPFailedStoreProbe.Add(ctx, s.GetAddr(), tikvClient) + GlobalMPPFailedStoreProbe.Add(ctx, s.GetAddr(), tikvClient) return } diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 01bb49ae5c650..2d36b59a96333 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -30,7 +30,8 @@ import ( "go.uber.org/zap" ) -var globalMPPFailedStoreProbe *MPPFailedStoreProbe +// GlobalMPPFailedStoreProbe mpp failed store probe +var GlobalMPPFailedStoreProbe *MPPFailedStoreProbe const ( // DetectPeriod detect period @@ -120,8 +121,7 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { state, ok := v.(*MPPSotreState) if !ok { logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", - zap.String("address", address), - zap.Any("state", v)) + zap.String("address", address)) t.Delete(address) return } @@ -175,8 +175,7 @@ func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, re state, ok := v.(*MPPSotreState) if !ok { logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", - zap.String("address", address), - zap.Any("state", v)) + zap.String("address", address)) t.Delete(address) return false } @@ -186,7 +185,7 @@ func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, re // Run a loop of scan // there can be only one background task -func (t *MPPFailedStoreProbe) run() { +func (t *MPPFailedStoreProbe) Run() { if !t.lock.TryLock() { return } @@ -201,23 +200,24 @@ func (t *MPPFailedStoreProbe) run() { for { select { case <-t.ctx.Done(): - logutil.BgLogger().Info("ctx.done") + logutil.BgLogger().Debug("ctx.done") return case <-ticker.C: t.scan(t.ctx) } } }() + logutil.BgLogger().Debug("run a background probe process for mpp") } -// Delete clean store from failed map -func (t *MPPFailedStoreProbe) stop() { +// Stop stop background goroutine +func (t *MPPFailedStoreProbe) Stop() { if !t.isStop.CompareAndSwap(false, true) { return } t.cancel() t.wg.Wait() - logutil.BgLogger().Info("stop background task") + logutil.BgLogger().Debug("stop background task") } // Delete clean store from failed map @@ -253,7 +253,7 @@ func init() { ctx, cancel := context.WithCancel(context.Background()) isStop := atomic.Bool{} isStop.Swap(true) - globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ + GlobalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, lock: &sync.Mutex{}, isStop: &isStop, diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go index 0e2d14fcf5ea5..d18c18dda3147 100644 --- a/store/copr/mpp_probe_test.go +++ b/store/copr/mpp_probe_test.go @@ -62,7 +62,7 @@ type ProbeTest map[string]*mockDetectClient func (t ProbeTest) add(ctx context.Context) { for k, v := range t { - globalMPPFailedStoreProbe.Add(ctx, k, v) + GlobalMPPFailedStoreProbe.Add(ctx, k, v) } } @@ -78,16 +78,16 @@ func (t ProbeTest) reSetErrortestype(to string) { func (t ProbeTest) judge(ctx context.Context, test *testing.T, recoveryTTL time.Duration, need bool) { for k := range t { - ok := globalMPPFailedStoreProbe.IsRecovery(ctx, k, recoveryTTL) + ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, k, recoveryTTL) require.Equal(test, need, ok) } } func failedStoreSizeJudge(ctx context.Context, test *testing.T, need int) { var l int - globalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProbe.scan(ctx) time.Sleep(time.Second / 10) - globalMPPFailedStoreProbe.failedMPPStores.Range(func(k, v interface{}) bool { + GlobalMPPFailedStoreProbe.failedMPPStores.Range(func(k, v interface{}) bool { l++ return true }) @@ -99,7 +99,7 @@ func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow for _, to := range flow { probetestest.reSetErrortestype(to) - globalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProbe.scan(ctx) time.Sleep(time.Second / 10) //wait detect goroutine finish var need bool @@ -112,15 +112,15 @@ func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow lastTo := flow[len(flow)-1] cleanRecover := func(need int) { - globalMPPFailedStoreProbe.maxRecoveryTimeLimit = 0 - time.Second + GlobalMPPFailedStoreProbe.maxRecoveryTimeLimit = 0 - time.Second failedStoreSizeJudge(ctx, test, need) - globalMPPFailedStoreProbe.maxRecoveryTimeLimit = MaxRecoveryTimeLimit + GlobalMPPFailedStoreProbe.maxRecoveryTimeLimit = MaxRecoveryTimeLimit } cleanObsolet := func(need int) { - globalMPPFailedStoreProbe.maxObsoletTimeLimit = 0 - time.Second + GlobalMPPFailedStoreProbe.maxObsoletTimeLimit = 0 - time.Second failedStoreSizeJudge(ctx, test, need) - globalMPPFailedStoreProbe.maxObsoletTimeLimit = MaxObsoletTimeLimit + GlobalMPPFailedStoreProbe.maxObsoletTimeLimit = MaxObsoletTimeLimit } if lastTo == Error { @@ -137,13 +137,13 @@ func TestMPPFailedStoreProbe(t *testing.T) { notExistAddress := "not exist address" - globalMPPFailedStoreProbe.detectPeriod = 0 - time.Second + GlobalMPPFailedStoreProbe.detectPeriod = 0 - time.Second // check not exist address - ok := globalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) + ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) require.True(t, ok) - globalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProbe.scan(ctx) probetestest := map[string]*mockDetectClient{ testimeout: {errortestype: testimeout}, @@ -158,20 +158,20 @@ func TestMPPFailedStoreProbe(t *testing.T) { func TestMPPFailedStoreProbeGoroutineTask(t *testing.T) { // Confirm that multiple tasks are not allowed - globalMPPFailedStoreProbe.lock.Lock() - globalMPPFailedStoreProbe.run() - globalMPPFailedStoreProbe.lock.Unlock() + GlobalMPPFailedStoreProbe.lock.Lock() + GlobalMPPFailedStoreProbe.Run() + GlobalMPPFailedStoreProbe.lock.Unlock() - globalMPPFailedStoreProbe.run() - globalMPPFailedStoreProbe.stop() + GlobalMPPFailedStoreProbe.Run() + GlobalMPPFailedStoreProbe.Stop() } func TestMPPFailedStoreAssertFailed(t *testing.T) { ctx := context.Background() - globalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) - globalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) + GlobalMPPFailedStoreProbe.scan(ctx) - globalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) - globalMPPFailedStoreProbe.IsRecovery(ctx, "errorinfo", 0) + GlobalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) + GlobalMPPFailedStoreProbe.IsRecovery(ctx, "errorinfo", 0) } diff --git a/store/copr/store.go b/store/copr/store.go index 252f306978e47..32553961acc67 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" - "github.com/pingcap/tidb/util/logutil" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -86,10 +85,6 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store return nil, errors.Trace(err) } - // run a background probe process for mpp - globalMPPFailedStoreProbe.run() - logutil.BgLogger().Info("run a background probe process for mpp") - /* #nosec G404 */ return &Store{ kvStore: &kvStore{store: s}, @@ -100,9 +95,6 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store // Close releases resources allocated for coprocessor. func (s *Store) Close() { - logutil.BgLogger().Info("store close") - globalMPPFailedStoreProbe.stop() - if s.coprCache != nil { s.coprCache.cache.Close() } diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index 493eedfcccbfa..361a929351642 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//sessionctx/variable", "//statistics", "//store", + "//store/copr", "//store/driver", "//store/mockstore", "//store/mockstore/unistore/metrics", diff --git a/tidb-server/main.go b/tidb-server/main.go index d58e24a531f4c..a025a2d465210 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" kvstore "github.com/pingcap/tidb/store" + "github.com/pingcap/tidb/store/copr" "github.com/pingcap/tidb/store/driver" "github.com/pingcap/tidb/store/mockstore" uni_metrics "github.com/pingcap/tidb/store/mockstore/unistore/metrics" @@ -309,6 +310,7 @@ func createStoreAndDomain() (kv.Storage, *domain.Domain) { var err error storage, err := kvstore.New(fullPath) terror.MustNil(err) + copr.GlobalMPPFailedStoreProbe.Run() err = infosync.CheckTiKVVersion(storage, *semver.New(versioninfo.TiKVMinVersion)) terror.MustNil(err) // Bootstrap a session to load information schema. @@ -814,6 +816,7 @@ func setupTracing() { func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) { tikv.StoreShuttingDown(1) dom.Close() + copr.GlobalMPPFailedStoreProbe.Stop() err := storage.Close() terror.Log(errors.Trace(err)) } From 59b45b84e7305abc28318adb5c08c8067ecc5dcf Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Wed, 21 Dec 2022 21:44:37 +0800 Subject: [PATCH 18/19] optmize for metris and some code --- kv/mpp.go | 2 +- metrics/server.go | 2 +- planner/core/fragment.go | 2 +- store/copr/batch_coprocessor.go | 8 +++---- store/copr/mpp.go | 6 ++--- store/copr/mpp_probe.go | 36 ++++++++++++++-------------- store/copr/mpp_probe_test.go | 42 ++++++++++++++++----------------- tidb-server/main.go | 4 ++-- 8 files changed, 51 insertions(+), 51 deletions(-) diff --git a/kv/mpp.go b/kv/mpp.go index 14c6f4c5f9b5b..2e398af595650 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -80,7 +80,7 @@ type MPPDispatchRequest struct { type MPPClient interface { // ConstructMPPTasks schedules task for a plan fragment. // TODO:: This interface will be refined after we support more executors. - ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, bool, time.Duration) ([]MPPTaskMeta, error) + ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error) // DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response diff --git a/metrics/server.go b/metrics/server.go index 9425bbf94e960..6bd34c4a8b903 100644 --- a/metrics/server.go +++ b/metrics/server.go @@ -283,7 +283,7 @@ var ( prometheus.GaugeOpts{ Namespace: "tidb", Subsystem: "server", - Name: "tiflash_failed_mpp_store", + Name: "tiflash_failed_store", Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.", }, []string{LblAddress}) diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 5c26009d4b2f9..917f4392d9f9e 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -378,7 +378,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err)) ttl = 30 * time.Second } - metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, true, ttl) + metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, ttl) if err != nil { return nil, errors.Trace(err) } diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 2da522b895578..b316d10acaf6e 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -335,15 +335,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] s := stores[idx] // check if store is failed already. - ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, s.GetAddr(), ttl) + ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) if !ok { return } tikvClient := kvStore.GetTiKVClient() - err := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) - if err != nil { - GlobalMPPFailedStoreProbe.Add(ctx, s.GetAddr(), tikvClient) + ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) + if !ok { + GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) return } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index cfbe6fb9abe00..02b66478958d4 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -62,7 +62,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { } // ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns. -func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, isMPP bool, ttl time.Duration) ([]kv.MPPTaskMeta, error) { +func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, ttl time.Duration) ([]kv.MPPTaskMeta, error) { ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) var tasks []*batchCopTask @@ -74,13 +74,13 @@ func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasks rangesForEachPartition[i] = NewKeyRanges(p.KeyRanges) partitionIDs[i] = p.ID } - tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, isMPP, ttl, true, 20, partitionIDs) + tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, true, ttl, true, 20, partitionIDs) } else { if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil } ranges := NewKeyRanges(req.KeyRanges) - tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, isMPP, ttl, true, 20) + tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, true, ttl, true, 20) } if err != nil { diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 2d36b59a96333..bfceff6d1a333 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -30,8 +30,8 @@ import ( "go.uber.org/zap" ) -// GlobalMPPFailedStoreProbe mpp failed store probe -var GlobalMPPFailedStoreProbe *MPPFailedStoreProbe +// GlobalMPPFailedStoreProber mpp failed store probe +var GlobalMPPFailedStoreProber *MPPFailedStoreProber const ( // DetectPeriod detect period @@ -41,7 +41,7 @@ const ( // MaxRecoveryTimeLimit wait TiFlash recovery,more than MPPStoreFailTTL MaxRecoveryTimeLimit = 15 * time.Minute // MaxObsoletTimeLimit no request for a long time,that might be obsoleted - MaxObsoletTimeLimit = 24 * time.Hour + MaxObsoletTimeLimit = time.Hour ) // MPPSotreState the state for MPPStore. @@ -56,8 +56,8 @@ type MPPSotreState struct { lastDetectTime time.Time } -// MPPFailedStoreProbe use for detecting of failed TiFlash instance -type MPPFailedStoreProbe struct { +// MPPFailedStoreProber use for detecting of failed TiFlash instance +type MPPFailedStoreProber struct { failedMPPStores *sync.Map lock *sync.Mutex isStop *atomic.Bool @@ -78,8 +78,8 @@ func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, defer func() { t.lastDetectTime = time.Now() }() metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0) - err := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit) - if err != nil { + ok := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit) + if !ok { metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1) t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. return @@ -109,7 +109,7 @@ func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duratio return false } -func (t MPPFailedStoreProbe) scan(ctx context.Context) { +func (t MPPFailedStoreProber) scan(ctx context.Context) { defer func() { if r := recover(); r != nil { logutil.Logger(ctx).Warn("mpp failed store probe scan error,will restart", zap.Any("recover", r), zap.Stack("stack")) @@ -152,7 +152,7 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { } // Add add a store when sync probe failed -func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, tikvClient tikv.Client) { +func (t *MPPFailedStoreProber) Add(ctx context.Context, address string, tikvClient tikv.Client) { state := MPPSotreState{ address: address, tikvClient: tikvClient, @@ -163,7 +163,7 @@ func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, tikvClien } // IsRecovery check whether the store is recovery -func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { +func (t *MPPFailedStoreProber) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { logutil.Logger(ctx).Debug("check failed store recovery", zap.String("address", address), zap.Duration("ttl", recoveryTTL)) v, ok := t.failedMPPStores.Load(address) @@ -185,7 +185,7 @@ func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, re // Run a loop of scan // there can be only one background task -func (t *MPPFailedStoreProbe) Run() { +func (t *MPPFailedStoreProber) Run() { if !t.lock.TryLock() { return } @@ -211,7 +211,7 @@ func (t *MPPFailedStoreProbe) Run() { } // Stop stop background goroutine -func (t *MPPFailedStoreProbe) Stop() { +func (t *MPPFailedStoreProber) Stop() { if !t.isStop.CompareAndSwap(false, true) { return } @@ -221,16 +221,16 @@ func (t *MPPFailedStoreProbe) Stop() { } // Delete clean store from failed map -func (t *MPPFailedStoreProbe) Delete(address string) { +func (t *MPPFailedStoreProber) Delete(address string) { metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) _, ok := t.failedMPPStores.LoadAndDelete(address) if !ok { - logutil.BgLogger().Warn("Store is deleted", zap.String("address", address), zap.Any("isok", ok)) + logutil.BgLogger().Warn("Store is deleted", zap.String("address", address)) } } // MPPStore detect function -func detectMPPStore(ctx context.Context, client tikv.Client, address string, detectTimeoutLimit time.Duration) error { +func detectMPPStore(ctx context.Context, client tikv.Client, address string, detectTimeoutLimit time.Duration) bool { resp, err := client.SendRequest(ctx, address, &tikvrpc.Request{ Type: tikvrpc.CmdMPPAlive, StoreTp: tikvrpc.TiFlash, @@ -244,16 +244,16 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string, det logutil.BgLogger().Warn("Store is not ready", zap.String("store address", address), zap.String("err message", err.Error())) - return err + return false } - return nil + return true } func init() { ctx, cancel := context.WithCancel(context.Background()) isStop := atomic.Bool{} isStop.Swap(true) - GlobalMPPFailedStoreProbe = &MPPFailedStoreProbe{ + GlobalMPPFailedStoreProber = &MPPFailedStoreProber{ failedMPPStores: &sync.Map{}, lock: &sync.Mutex{}, isStop: &isStop, diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go index d18c18dda3147..7826c970d3e1e 100644 --- a/store/copr/mpp_probe_test.go +++ b/store/copr/mpp_probe_test.go @@ -62,7 +62,7 @@ type ProbeTest map[string]*mockDetectClient func (t ProbeTest) add(ctx context.Context) { for k, v := range t { - GlobalMPPFailedStoreProbe.Add(ctx, k, v) + GlobalMPPFailedStoreProber.Add(ctx, k, v) } } @@ -78,16 +78,16 @@ func (t ProbeTest) reSetErrortestype(to string) { func (t ProbeTest) judge(ctx context.Context, test *testing.T, recoveryTTL time.Duration, need bool) { for k := range t { - ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, k, recoveryTTL) + ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, k, recoveryTTL) require.Equal(test, need, ok) } } func failedStoreSizeJudge(ctx context.Context, test *testing.T, need int) { var l int - GlobalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProber.scan(ctx) time.Sleep(time.Second / 10) - GlobalMPPFailedStoreProbe.failedMPPStores.Range(func(k, v interface{}) bool { + GlobalMPPFailedStoreProber.failedMPPStores.Range(func(k, v interface{}) bool { l++ return true }) @@ -99,7 +99,7 @@ func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow for _, to := range flow { probetestest.reSetErrortestype(to) - GlobalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProber.scan(ctx) time.Sleep(time.Second / 10) //wait detect goroutine finish var need bool @@ -112,15 +112,15 @@ func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow lastTo := flow[len(flow)-1] cleanRecover := func(need int) { - GlobalMPPFailedStoreProbe.maxRecoveryTimeLimit = 0 - time.Second + GlobalMPPFailedStoreProber.maxRecoveryTimeLimit = 0 - time.Second failedStoreSizeJudge(ctx, test, need) - GlobalMPPFailedStoreProbe.maxRecoveryTimeLimit = MaxRecoveryTimeLimit + GlobalMPPFailedStoreProber.maxRecoveryTimeLimit = MaxRecoveryTimeLimit } cleanObsolet := func(need int) { - GlobalMPPFailedStoreProbe.maxObsoletTimeLimit = 0 - time.Second + GlobalMPPFailedStoreProber.maxObsoletTimeLimit = 0 - time.Second failedStoreSizeJudge(ctx, test, need) - GlobalMPPFailedStoreProbe.maxObsoletTimeLimit = MaxObsoletTimeLimit + GlobalMPPFailedStoreProber.maxObsoletTimeLimit = MaxObsoletTimeLimit } if lastTo == Error { @@ -137,13 +137,13 @@ func TestMPPFailedStoreProbe(t *testing.T) { notExistAddress := "not exist address" - GlobalMPPFailedStoreProbe.detectPeriod = 0 - time.Second + GlobalMPPFailedStoreProber.detectPeriod = 0 - time.Second // check not exist address - ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0) + ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, notExistAddress, 0) require.True(t, ok) - GlobalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProber.scan(ctx) probetestest := map[string]*mockDetectClient{ testimeout: {errortestype: testimeout}, @@ -158,20 +158,20 @@ func TestMPPFailedStoreProbe(t *testing.T) { func TestMPPFailedStoreProbeGoroutineTask(t *testing.T) { // Confirm that multiple tasks are not allowed - GlobalMPPFailedStoreProbe.lock.Lock() - GlobalMPPFailedStoreProbe.Run() - GlobalMPPFailedStoreProbe.lock.Unlock() + GlobalMPPFailedStoreProber.lock.Lock() + GlobalMPPFailedStoreProber.Run() + GlobalMPPFailedStoreProber.lock.Unlock() - GlobalMPPFailedStoreProbe.Run() - GlobalMPPFailedStoreProbe.Stop() + GlobalMPPFailedStoreProber.Run() + GlobalMPPFailedStoreProber.Stop() } func TestMPPFailedStoreAssertFailed(t *testing.T) { ctx := context.Background() - GlobalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) - GlobalMPPFailedStoreProbe.scan(ctx) + GlobalMPPFailedStoreProber.failedMPPStores.Store("errorinfo", nil) + GlobalMPPFailedStoreProber.scan(ctx) - GlobalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil) - GlobalMPPFailedStoreProbe.IsRecovery(ctx, "errorinfo", 0) + GlobalMPPFailedStoreProber.failedMPPStores.Store("errorinfo", nil) + GlobalMPPFailedStoreProber.IsRecovery(ctx, "errorinfo", 0) } diff --git a/tidb-server/main.go b/tidb-server/main.go index a025a2d465210..8605c7d0c3aaa 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -310,7 +310,7 @@ func createStoreAndDomain() (kv.Storage, *domain.Domain) { var err error storage, err := kvstore.New(fullPath) terror.MustNil(err) - copr.GlobalMPPFailedStoreProbe.Run() + copr.GlobalMPPFailedStoreProber.Run() err = infosync.CheckTiKVVersion(storage, *semver.New(versioninfo.TiKVMinVersion)) terror.MustNil(err) // Bootstrap a session to load information schema. @@ -816,7 +816,7 @@ func setupTracing() { func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) { tikv.StoreShuttingDown(1) dom.Close() - copr.GlobalMPPFailedStoreProbe.Stop() + copr.GlobalMPPFailedStoreProber.Stop() err := storage.Close() terror.Log(errors.Trace(err)) } From 18226b2fe33714849d8662bb4e31466c1e124cea Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Thu, 22 Dec 2022 12:04:11 +0800 Subject: [PATCH 19/19] fix some problem --- store/copr/mpp_probe.go | 54 +++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index bfceff6d1a333..0a0eba286648e 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -44,16 +44,18 @@ const ( MaxObsoletTimeLimit = time.Hour ) -// MPPSotreState the state for MPPStore. -type MPPSotreState struct { +// MPPStoreState the state for MPPStore. +type MPPStoreState struct { address string // MPPStore TiFlash address tikvClient tikv.Client - lock sync.Mutex + lock struct { + sync.Mutex - recoveryTime time.Time - lastLookupTime time.Time - lastDetectTime time.Time + recoveryTime time.Time + lastLookupTime time.Time + lastDetectTime time.Time + } } // MPPFailedStoreProber use for detecting of failed TiFlash instance @@ -71,40 +73,40 @@ type MPPFailedStoreProber struct { maxObsoletTimeLimit time.Duration } -func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) { - if time.Since(t.lastDetectTime) < detectPeriod { +func (t *MPPStoreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) { + if time.Since(t.lock.lastDetectTime) < detectPeriod { return } - defer func() { t.lastDetectTime = time.Now() }() + defer func() { t.lock.lastDetectTime = time.Now() }() metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0) ok := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit) if !ok { metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1) - t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. + t.lock.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. return } // record the time of the first recovery - if t.recoveryTime.IsZero() { - t.recoveryTime = time.Now() + if t.lock.recoveryTime.IsZero() { + t.lock.recoveryTime = time.Now() } } -func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { +func (t *MPPStoreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { if !t.lock.TryLock() { return false } defer t.lock.Unlock() - t.lastLookupTime = time.Now() - if !t.recoveryTime.IsZero() && time.Since(t.recoveryTime) > recoveryTTL { + t.lock.lastLookupTime = time.Now() + if !t.lock.recoveryTime.IsZero() && time.Since(t.lock.recoveryTime) > recoveryTTL { return true } logutil.Logger(ctx).Debug("Cannot detect store's availability "+ "because the current time has not recovery or wait mppStoreFailTTL", zap.String("store address", t.address), - zap.Time("recovery time", t.recoveryTime), + zap.Time("recovery time", t.lock.recoveryTime), zap.Duration("MPPStoreFailTTL", recoveryTTL)) return false } @@ -118,9 +120,9 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) { do := func(k, v any) { address := fmt.Sprint(k) - state, ok := v.(*MPPSotreState) + state, ok := v.(*MPPStoreState) if !ok { - logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", + logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean", zap.String("address", address)) t.Delete(address) return @@ -134,10 +136,10 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) { state.detect(ctx, t.detectPeriod, t.detectTimeoutLimit) // clean restored store - if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > t.maxRecoveryTimeLimit { + if !state.lock.recoveryTime.IsZero() && time.Since(state.lock.recoveryTime) > t.maxRecoveryTimeLimit { t.Delete(address) // clean store that may be obsolete - } else if state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > t.maxObsoletTimeLimit { + } else if state.lock.recoveryTime.IsZero() && time.Since(state.lock.lastLookupTime) > t.maxObsoletTimeLimit { t.Delete(address) } } @@ -153,11 +155,11 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) { // Add add a store when sync probe failed func (t *MPPFailedStoreProber) Add(ctx context.Context, address string, tikvClient tikv.Client) { - state := MPPSotreState{ - address: address, - tikvClient: tikvClient, - lastLookupTime: time.Now(), + state := MPPStoreState{ + address: address, + tikvClient: tikvClient, } + state.lock.lastLookupTime = time.Now() logutil.Logger(ctx).Debug("add mpp store to failed list", zap.String("address", address)) t.failedMPPStores.Store(address, &state) } @@ -172,9 +174,9 @@ func (t *MPPFailedStoreProber) IsRecovery(ctx context.Context, address string, r return true } - state, ok := v.(*MPPSotreState) + state, ok := v.(*MPPStoreState) if !ok { - logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", + logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean", zap.String("address", address)) t.Delete(address) return false