Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support disaggregated tiflash #33535

Merged
merged 57 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
e9d56f7
randomly dispatch req to tiflash-dev1
guo-shaoge Mar 29, 2022
56f98ca
distribute task randomly
guo-shaoge Mar 29, 2022
d84a99d
dispatch MPPTask to TiFlash ReadNodes using consistent hash
guo-shaoge Apr 27, 2022
f7f0578
fix fmt
guo-shaoge Apr 27, 2022
259e14f
change usage of cacheRegion.GetTiFlashMPPStores()
guo-shaoge Apr 28, 2022
a6d4681
clean buildBatchCopTasksConsistentHash() and delete useless code
guo-shaoge Apr 30, 2022
a4708e0
fix
guo-shaoge May 16, 2022
d9077e0
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge May 16, 2022
17e6abf
fix
guo-shaoge May 16, 2022
e7445bc
only coprocessor.go and mpp.go will take care of kv.TiFlashMPP.
guo-shaoge May 17, 2022
788422a
fix comment
guo-shaoge May 17, 2022
52be11e
add failpoint. change semantics of tidb_isolation_read_engines
guo-shaoge May 18, 2022
e610604
change some comments
guo-shaoge May 18, 2022
fe2c235
only invalid tiflash_mpp node cache if is grpc error
guo-shaoge May 19, 2022
3c128b4
add failpoint check_store_type_of_batch_cop_task
guo-shaoge May 19, 2022
56d327b
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge May 20, 2022
33af6b0
fix partition table dispatching
guo-shaoge May 22, 2022
d5b21de
fix go.mod
guo-shaoge May 23, 2022
7e851b6
fix
guo-shaoge May 23, 2022
706dfdf
fix comment
guo-shaoge May 25, 2022
9ab9afc
refresh tiflash_mpp store cache every 30 seconds
guo-shaoge May 25, 2022
eca3e60
fix
guo-shaoge May 25, 2022
e912a64
fix lint
guo-shaoge May 26, 2022
186a7a8
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge Aug 2, 2022
dbaeaa6
fix ignoring tiflash_mpp when booststraping
guo-shaoge Aug 2, 2022
c54607a
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge Aug 21, 2022
d5e083c
change MPPTask dispatch from ConsistentHash(regionID) to ConsistentHa…
guo-shaoge Sep 20, 2022
a572425
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge Sep 20, 2022
a1def16
fix fmt
guo-shaoge Sep 20, 2022
34a2640
using config(disaggregated_tiflash) instead of isolation_read_engines
guo-shaoge Sep 21, 2022
d05c46c
1. delete usage of kv.TiFlashMPP
guo-shaoge Sep 21, 2022
99e6f32
fix go.mod
guo-shaoge Sep 21, 2022
639585f
fix config fmt
guo-shaoge Sep 21, 2022
21d3d0a
fix config fmt
guo-shaoge Sep 21, 2022
aab9912
fix
guo-shaoge Sep 22, 2022
d9c8e50
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge Sep 23, 2022
c19d5ba
fix bazel_lint
guo-shaoge Sep 23, 2022
1435327
change all tiflash_mpp to tiflash_compute
guo-shaoge Oct 11, 2022
dadf850
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge Oct 11, 2022
361d7a8
fix fmt
guo-shaoge Oct 11, 2022
c01ec1f
fix go.mod
guo-shaoge Oct 13, 2022
503a29a
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge Oct 13, 2022
67e014d
fix config def value
guo-shaoge Oct 16, 2022
2ece861
disable cop and batch cop in Disaggregated Tiflash
guo-shaoge Nov 20, 2022
578f487
Merge branch 'master' of github.com:pingcap/tidb into random_dispatch…
guo-shaoge Dec 6, 2022
8ffe715
fix fmt
guo-shaoge Dec 6, 2022
bdd16e5
update bazel
guo-shaoge Dec 6, 2022
f7e278a
add case
guo-shaoge Dec 8, 2022
8d592b8
update domain.go add recover log
guo-shaoge Dec 8, 2022
a51ecf6
update err msg
guo-shaoge Dec 8, 2022
835acbf
Merge branch 'master' into random_dispatch_mpp_req
guo-shaoge Dec 8, 2022
d2cb9c0
update case
guo-shaoge Dec 8, 2022
fa6e7e3
Merge branch 'master' into random_dispatch_mpp_req
guo-shaoge Dec 8, 2022
77ca039
update bazel
guo-shaoge Dec 8, 2022
4aa16d9
Merge branch 'random_dispatch_mpp_req' of github.com:guo-shaoge/tidb …
guo-shaoge Dec 8, 2022
c7726a4
fix prunt path logic
guo-shaoge Dec 8, 2022
a50994d
Merge branch 'random_dispatch_mpp_req' of github.com:guo-shaoge/tidb …
guo-shaoge Dec 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ type Config struct {
Plugin Plugin `toml:"plugin" json:"plugin"`
MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"`
// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
Expand Down Expand Up @@ -988,6 +989,7 @@ var defaultConf = Config{
NewCollationsEnabledOnFirstBootstrap: true,
EnableGlobalKill: true,
TrxSummary: DefaultTrxSummary(),
DisaggregatedTiFlash: false,
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
}
Expand Down
64 changes: 62 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
"github.com/pingcap/tidb/util/memoryusagealarm"
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -1416,6 +1417,64 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
return nil
}

// WatchTiFlashComputeNodeChange create a routine to watch if the topology of tiflash_compute node is changed.
// TODO: tiflashComputeNodeKey is not put to etcd yet(finish this when AutoScaler is done)
//
// store cache will only be invalidated every 30 seconds.
func (do *Domain) WatchTiFlashComputeNodeChange() error {
var watchCh clientv3.WatchChan
if do.etcdClient != nil {
watchCh = do.etcdClient.Watch(context.Background(), tiflashComputeNodeKey)
}
do.wg.Add(1)
duration := 10 * time.Second
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("WatchTiFlashComputeNodeChange exit")
util.Recover(metrics.LabelDomain, "WatchTiFlashComputeNodeChange", nil, false)
}()

var count int
var logCount int
for {
ok := true
var watched bool
select {
case <-do.exit:
return
case _, ok = <-watchCh:
watched = true
case <-time.After(duration):
}
if !ok {
logutil.BgLogger().Error("WatchTiFlashComputeNodeChange watch channel closed")
watchCh = do.etcdClient.Watch(context.Background(), tiflashComputeNodeKey)
count++
if count > 10 {
time.Sleep(time.Duration(count) * time.Second)
}
continue
}
count = 0
switch s := do.store.(type) {
case tikv.Storage:
logCount++
s.GetRegionCache().InvalidateTiFlashComputeStores()
if logCount == 60 {
// Print log every 60*duration seconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to print the log every 60s? Or you mean 600s?

logutil.BgLogger().Debug("tiflash_compute store cache invalied, will update next query", zap.Bool("watched", watched))
logCount = 0
}
default:
logutil.BgLogger().Debug("No need to watch tiflash_compute store cache for non-tikv store")
return
}
}
}()
return nil
}

// PrivilegeHandle returns the MySQLPrivilege.
func (do *Domain) PrivilegeHandle() *privileges.Handle {
return do.privHandle
Expand Down Expand Up @@ -2078,8 +2137,9 @@ func (do *Domain) ServerMemoryLimitHandle() *servermemorylimit.Handle {
}

const (
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
tiflashComputeNodeKey = "/tiflash/new_tiflash_compute_nodes"
)

// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
Expand Down
25 changes: 25 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/auth"
Expand Down Expand Up @@ -1278,3 +1279,27 @@ func TestBindingFromHistoryWithTiFlashBindable(t *testing.T) {
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with tiflash engine")
}

func TestDisaggregatedTiFlash(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int)")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
require.Contains(t, err.Error(), "Please check tiflash_compute node is available")

config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})
tk.MustQuery("select * from t;").Check(testkit.Rows())
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
Expand Down Expand Up @@ -218,7 +219,6 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
1 change: 1 addition & 0 deletions planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ go_library(
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
"//store/driver/backoff",
"//table",
"//table/tables",
"//table/temptable",
Expand Down
4 changes: 3 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -1985,7 +1986,8 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
}
}
if prop.TaskTp == property.MppTaskType {
// In disaggregated tiflash mode, only MPP is allowed, Cop and BatchCop is deprecated.
if prop.TaskTp == property.MppTaskType || config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash {
if ts.KeepOrder {
return invalidTask, nil
}
Expand Down
19 changes: 19 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
Expand All @@ -65,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/size"
"github.com/tikv/client-go/v2/tikv"
)

const (
Expand Down Expand Up @@ -685,6 +688,13 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
ds.preferStoreType = 0
return
}
if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) {
// TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available.
errMsg := "No available tiflash_compute node"
warning := ErrInternal.GenWithStack(errMsg)
ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
return
}
for _, path := range ds.possibleAccessPaths {
if path.StoreType == kv.TiFlash {
ds.preferStoreType |= preferTiFlash
Expand All @@ -702,6 +712,15 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
}
}

func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool {
bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil)
stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil || len(stores) == 0 {
return false
}
return true
}

func resetNotNullFlag(schema *expression.Schema, start, end int) {
for i := start; i < end; i++ {
col := *schema.Columns[i]
Expand Down
19 changes: 17 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,15 +1392,26 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
var outputComputeNodeErrMsg bool
noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx)
for i := len(paths) - 1; i >= 0; i-- {
// availableEngineStr is for warning message.
if _, ok := availableEngine[paths[i].StoreType]; !ok {
availableEngine[paths[i].StoreType] = struct{}{}
if availableEngineStr != "" {
availableEngineStr += ", "
}
availableEngineStr += paths[i].StoreType.Name()
}
if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB {
_, exists := isolationReadEngines[paths[i].StoreType]
// Prune this path if:
// 1. path.StoreType doesn't exists in isolationReadEngines or
// 2. TiFlash is disaggregated and the number of tiflash_compute node is zero.
shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash
if shouldPruneTiFlashCompute {
outputComputeNodeErrMsg = true
}
if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute {
paths = append(paths[:i], paths[i+1:]...)
}
}
Expand All @@ -1409,7 +1420,11 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
if len(paths) == 0 {
helpMsg := ""
if engineVals == "tiflash" {
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
if outputComputeNodeErrMsg {
helpMsg = ". Please check tiflash_compute node is available"
} else {
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
}
}
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(),
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg))
Expand Down
4 changes: 4 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,10 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
}
attachPlan2Task(proj, newMpp)
return newMpp
case NoMpp:
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
t = mpp.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
return t
default:
return invalidTask
}
Expand Down
8 changes: 8 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3018,6 +3018,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

if config.GetGlobalConfig().DisaggregatedTiFlash {
// Invalid client-go tiflash_compute store cache if necessary.
err = dom.WatchTiFlashComputeNodeChange()
if err != nil {
return nil, err
}
}

if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stathat_consistent//:consistent",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
Expand Down Expand Up @@ -75,6 +76,7 @@ go_test(
"//testkit/testsetup",
"//util/paging",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
Expand Down
Loading