From 05c231b67954421f7dc69cfce98c65460ba44945 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 29 Jan 2023 16:43:47 +0800 Subject: [PATCH 01/11] planner: fix HashAgg cannot pushdown to tiflash_compute Signed-off-by: guo-shaoge --- planner/core/find_best_task.go | 5 ++-- planner/core/logical_plan_builder.go | 7 ------ planner/core/task.go | 37 ++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index c2d3e1a62d7bc..804bfec327da7 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2012,13 +2012,14 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } // In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated. // So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask. - isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash + isDisaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash + isDisaggregatedTiFlashPath := isDisaggregatedTiFlash && ts.StoreType == kv.TiFlash canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed() if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash { if ts.KeepOrder { return invalidTask, nil } - if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) { + if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !isDisaggregatedTiFlash) { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. // But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table. ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 2d73534fc2e1e..b3575b593efa0 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" @@ -67,7 +66,6 @@ import ( "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/size" - "github.com/tikv/client-go/v2/tikv" ) const ( @@ -717,11 +715,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { - bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil) - stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil || len(stores) == 0 { - return false - } return true } diff --git a/planner/core/task.go b/planner/core/task.go index 5d7ca6e5fd424..ba77c48261881 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1820,6 +1820,9 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } switch p.MppRunMode { case Mpp1Phase: + if !aggFuncModeSame(p) { + return invalidTask + } // 1-phase agg: when the partition columns can be satisfied, where the plan does not need to enforce Exchange // only push down the original agg proj := p.convertAvgForMPP() @@ -1835,6 +1838,9 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { if partialAgg == nil { return invalidTask } + if !aggFuncModeSame(finalAgg) { + return invalidTask + } attachPlan2Task(partialAgg, mpp) partitionCols := p.MppPartitionCols if len(partitionCols) == 0 { @@ -1882,6 +1888,9 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { if finalAgg == nil { return invalidTask } + if !aggFuncModeSame(finalAgg) { + return invalidTask + } // generate 3 stage aggregation for single count distinct if applicable. // select count(distinct a), count(b) from foo @@ -1987,6 +1996,34 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } } +func aggFuncModeSame(p PhysicalPlan) bool { + funcs := make([]*aggregation.AggFuncDesc, 0, 8) + sa, ok := p.(*PhysicalStreamAgg) + if ok { + for _, f := range sa.AggFuncs { + funcs = append(funcs, f) + } + } else { + ha, ok := p.(*PhysicalHashAgg) + if !ok { + return false + } + for _, f := range ha.AggFuncs { + funcs = append(funcs, f) + } + } + if len(funcs) == 0 { + return true + } + expFuncMode := funcs[0].Mode + for _, f := range funcs { + if f.Mode != expFuncMode { + return false + } + } + return true +} + func (p *PhysicalHashAgg) attach2Task(tasks ...task) task { t := tasks[0].copy() final := p From 34a3be31903b9e4f57ba734eb5211e057813cbd6 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 11:49:34 +0800 Subject: [PATCH 02/11] disable PartitionUnion pushdown tiflash explicitly Signed-off-by: guo-shaoge --- planner/core/BUILD.bazel | 1 - planner/core/physical_plan_test.go | 73 +++++++++++++++++++++++ planner/core/task.go | 9 +++ planner/core/testdata/plan_suite_in.json | 7 +++ planner/core/testdata/plan_suite_out.json | 63 +++++++++++++++++++ 5 files changed, 152 insertions(+), 1 deletion(-) diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 3afbdf3b8a0bc..12c2730c59364 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -107,7 +107,6 @@ go_library( "//sessiontxn/staleread", "//statistics", "//statistics/handle", - "//store/driver/backoff", "//table", "//table/tables", "//table/temptable", diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 640c0f04630c1..a8a7a1918cfc0 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" @@ -2626,3 +2627,75 @@ func TestCountStarForTiFlash(t *testing.T) { tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) } } + +func TestHashAggPushdownToTiFlashCompute(t *testing.T) { + var ( + input []string + output []struct { + SQL string + Plan []string + Warning []string + } + ) + planSuiteData := core.GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl_15;") + tk.MustExec(`create table tbl_15 (col_89 text (473) collate utf8mb4_bin , + col_90 timestamp default '1976-04-03' , + col_91 tinyint unsigned not null , + col_92 tinyint , + col_93 double not null , + col_94 datetime not null default '1970-06-08' , + col_95 datetime default '2028-02-13' , + col_96 int unsigned not null default 2532480521 , + col_97 char (168) default '') partition by hash (col_91) partitions 4;`) + + tk.MustExec("drop table if exists tbl_16;") + tk.MustExec(`create table tbl_16 (col_98 text (246) not null , + col_99 decimal (30 ,19) , + col_100 mediumint unsigned , + col_101 text (410) collate utf8mb4_bin , + col_102 date not null , + col_103 timestamp not null default '2003-08-27' , + col_104 text (391) not null , + col_105 date default '2010-10-24' , + col_106 text (9) not null,primary key (col_100, col_98(5), col_103), + unique key idx_23 (col_100, col_106 (3), col_101 (3))) partition by hash (col_100) partitions 2;`) + + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) + + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + tableName := tblInfo.Name.L + if tableName == "tbl_15" || tableName == "tbl_16" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;") + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';") + + for i, ts := range input { + testdata.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows()) + }) + tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/task.go b/planner/core/task.go index ba77c48261881..8f73488381a06 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1161,6 +1161,14 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { } func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { + for _, t := range tasks { + if _, ok := t.(*mppTask); ok && p.TP() == plancodec.TypePartitionUnion { + // In attach2MppTasks(), will attach Union to mppTask directly. + // But PartitionUnion cannot pushdown to tiflash, so disable PartitionUnion pushdown to tiflash explicitly. + // TODO: Let mppTask convertToRootTask instread of return invalidTask directly. + return invalidTask + } + } for _, t := range tasks { if _, ok := t.(*mppTask); ok { return p.attach2MppTasks(tasks...) @@ -1820,6 +1828,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } switch p.MppRunMode { case Mpp1Phase: + // TiFlash will check if agg func mode are all same. Otherwise will give error. if !aggFuncModeSame(p) { return invalidTask } diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index d433f5dd88dbe..ee95b65bd469e 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -1186,5 +1186,12 @@ "select a, count(*) from t group by a -- shouldn't be rewritten", "select sum(a) from t -- sum shouldn't be rewritten" ] + }, + { + "name": "TestHashAggPushdownToTiFlashCompute", + "cases": [ + "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;" + ] } ] diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 31964823e95f2..e8a4552f0fba0 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -7501,5 +7501,68 @@ "Warning": null } ] + }, + { + "Name": "TestHashAggPushdownToTiFlashCompute", + "Cases": [ + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "Plan": [ + "Limit 1.00 root offset:0, count:79", + "└─Sort 1.00 root Column#11, Column#12, Column#13, Column#14", + " └─HashAgg 1.00 root funcs:avg(distinct Column#71)->Column#11, funcs:min(Column#72)->Column#12, funcs:sum(distinct Column#73)->Column#13, funcs:max(Column#74)->Column#14", + " └─Projection 7100.44 root cast(test.tbl_15.col_96, decimal(10,0) UNSIGNED BINARY)->Column#71, Column#15, cast(test.tbl_15.col_91, decimal(3,0) UNSIGNED BINARY)->Column#73, Column#16", + " └─PartitionUnion 7100.44 root ", + " ├─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#24)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#26)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#24, funcs:max(test.tbl_15.col_92)->Column#26", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#36)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#38)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#36, funcs:max(test.tbl_15.col_92)->Column#38", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#48)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#50)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#48, funcs:max(test.tbl_15.col_92)->Column#50", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#60)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#62)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " └─TableReader 1775.11 root data:ExchangeSender", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#60, funcs:max(test.tbl_15.col_92)->Column#62", + " └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;", + "Plan": [ + "TopN 20.00 root Column#10, offset:0, count:20", + "└─HashAgg 63.95 root group by:test.tbl_16.col_100, funcs:avg(Column#11, Column#12)->Column#10", + " └─PartitionUnion 63.95 root ", + " ├─StreamAgg 31.98 root group by:Column#19, funcs:count(Column#16)->Column#11, funcs:sum(Column#17)->Column#12, funcs:firstrow(Column#18)->test.tbl_16.col_100", + " │ └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#17, test.tbl_16.col_100, test.tbl_16.col_100", + " │ └─Sort 39.97 root test.tbl_16.col_100", + " │ └─TableReader 39.97 root data:ExchangeSender", + " │ └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " │ └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p0 keep order:false, stats:pseudo", + " └─StreamAgg 31.98 root group by:Column#23, funcs:count(Column#20)->Column#11, funcs:sum(Column#21)->Column#12, funcs:firstrow(Column#22)->test.tbl_16.col_100", + " └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#21, test.tbl_16.col_100, test.tbl_16.col_100", + " └─Sort 39.97 root test.tbl_16.col_100", + " └─TableReader 39.97 root data:ExchangeSender", + " └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p1 keep order:false, stats:pseudo" + ], + "Warning": null + } + ] } ] From 44e887eba96f5a21d11e4ab5c57b956727021e1c Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 11:54:37 +0800 Subject: [PATCH 03/11] remove isTiFlashComputeNodeAvailable Signed-off-by: guo-shaoge --- planner/core/logical_plan_builder.go | 12 ------------ planner/core/planbuilder.go | 23 ++--------------------- 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index b3575b593efa0..f6e566bac43a3 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -690,13 +689,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.preferStoreType = 0 return } - if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) { - // TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available. - errMsg := "No available tiflash_compute node" - warning := ErrInternal.GenWithStack(errMsg) - ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) - return - } for _, path := range ds.possibleAccessPaths { if path.StoreType == kv.TiFlash { ds.preferStoreType |= preferTiFlash @@ -714,10 +706,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } } -func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { - return true -} - func resetNotNullFlag(schema *expression.Schema, start, end int) { for i := start; i < end; i++ { col := *schema.Columns[i] diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 5af0bb004d6c9..451d63b23ec00 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1453,8 +1453,6 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines() availableEngine := map[kv.StoreType]struct{}{} var availableEngineStr string - var outputComputeNodeErrMsg bool - noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx) for i := len(paths) - 1; i >= 0; i-- { // availableEngineStr is for warning message. if _, ok := availableEngine[paths[i].StoreType]; !ok { @@ -1464,20 +1462,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, } availableEngineStr += paths[i].StoreType.Name() } - _, exists := isolationReadEngines[paths[i].StoreType] - // Prune this path if: - // 1. path.StoreType doesn't exists in isolationReadEngines or - // 2. TiFlash is disaggregated and the number of tiflash_compute node is zero. - shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash - failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) { - // Ignore check if tiflash_compute node number. - // After we support disaggregated tiflash in test framework, can delete this failpoint. - shouldPruneTiFlashCompute = val.(bool) - }) - if shouldPruneTiFlashCompute { - outputComputeNodeErrMsg = true - } - if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute { + if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB { paths = append(paths[:i], paths[i+1:]...) } } @@ -1486,11 +1471,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, if len(paths) == 0 { helpMsg := "" if engineVals == "tiflash" { - if outputComputeNodeErrMsg { - helpMsg = ". Please check tiflash_compute node is available" - } else { - helpMsg = ". Please check tiflash replica or ensure the query is readonly" - } + helpMsg = ". Please check tiflash replica or ensure the query is readonly" } err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(), variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg)) From 4954157f6c48c5aa0af77cb89502770d84f99b11 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 11:59:03 +0800 Subject: [PATCH 04/11] fix typo Signed-off-by: guo-shaoge --- planner/core/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/task.go b/planner/core/task.go index 8f73488381a06..88596540917c4 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1165,7 +1165,7 @@ func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { if _, ok := t.(*mppTask); ok && p.TP() == plancodec.TypePartitionUnion { // In attach2MppTasks(), will attach Union to mppTask directly. // But PartitionUnion cannot pushdown to tiflash, so disable PartitionUnion pushdown to tiflash explicitly. - // TODO: Let mppTask convertToRootTask instread of return invalidTask directly. + // TODO: Let mppTask convertToRootTask instead of return invalidTask directly. return invalidTask } } From f22592c1c2b9368d41cb88f570a4909aba94b079 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 12:15:45 +0800 Subject: [PATCH 05/11] fix import Signed-off-by: guo-shaoge --- planner/core/planbuilder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index b9bcbf1bca22a..5535faa97ab92 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -26,7 +26,6 @@ import ( "unsafe" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" From 2b7d952c540855cac26044e637b54a3875e7af28 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 14:17:54 +0800 Subject: [PATCH 06/11] fix lint Signed-off-by: guo-shaoge --- planner/core/task.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index 88596540917c4..7f7594cdec599 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2009,17 +2009,13 @@ func aggFuncModeSame(p PhysicalPlan) bool { funcs := make([]*aggregation.AggFuncDesc, 0, 8) sa, ok := p.(*PhysicalStreamAgg) if ok { - for _, f := range sa.AggFuncs { - funcs = append(funcs, f) - } + funcs = append(funcs, sa.AggFuncs...) } else { ha, ok := p.(*PhysicalHashAgg) if !ok { return false } - for _, f := range ha.AggFuncs { - funcs = append(funcs, f) - } + funcs = append(funcs, ha.AggFuncs...) } if len(funcs) == 0 { return true From 0d8574d7192b357c4b76c2ffef61e785a9fec6de Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 18:43:20 +0800 Subject: [PATCH 07/11] fix different agg mode Signed-off-by: guo-shaoge --- planner/core/rule_aggregation_push_down.go | 1 + planner/core/task.go | 34 ----------- planner/core/testdata/plan_suite_out.json | 68 ++++++++++++---------- 3 files changed, 39 insertions(+), 64 deletions(-) diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index c9326929b550f..8663f43feeb74 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -405,6 +405,7 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u if err != nil { return nil, err } + firstRow.Mode = aggregation.Partial1Mode newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow) } tmpSchema := expression.NewSchema(newAgg.GetGroupByCols()...) diff --git a/planner/core/task.go b/planner/core/task.go index 7f7594cdec599..29b308b3db02d 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1828,10 +1828,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } switch p.MppRunMode { case Mpp1Phase: - // TiFlash will check if agg func mode are all same. Otherwise will give error. - if !aggFuncModeSame(p) { - return invalidTask - } // 1-phase agg: when the partition columns can be satisfied, where the plan does not need to enforce Exchange // only push down the original agg proj := p.convertAvgForMPP() @@ -1847,9 +1843,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { if partialAgg == nil { return invalidTask } - if !aggFuncModeSame(finalAgg) { - return invalidTask - } attachPlan2Task(partialAgg, mpp) partitionCols := p.MppPartitionCols if len(partitionCols) == 0 { @@ -1897,9 +1890,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { if finalAgg == nil { return invalidTask } - if !aggFuncModeSame(finalAgg) { - return invalidTask - } // generate 3 stage aggregation for single count distinct if applicable. // select count(distinct a), count(b) from foo @@ -2005,30 +1995,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } } -func aggFuncModeSame(p PhysicalPlan) bool { - funcs := make([]*aggregation.AggFuncDesc, 0, 8) - sa, ok := p.(*PhysicalStreamAgg) - if ok { - funcs = append(funcs, sa.AggFuncs...) - } else { - ha, ok := p.(*PhysicalHashAgg) - if !ok { - return false - } - funcs = append(funcs, ha.AggFuncs...) - } - if len(funcs) == 0 { - return true - } - expFuncMode := funcs[0].Mode - for _, f := range funcs { - if f.Mode != expFuncMode { - return false - } - } - return true -} - func (p *PhysicalHashAgg) attach2Task(tasks ...task) task { t := tasks[0].copy() final := p diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index e8a4552f0fba0..e2e9a184d2c3d 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -7510,33 +7510,41 @@ "Plan": [ "Limit 1.00 root offset:0, count:79", "└─Sort 1.00 root Column#11, Column#12, Column#13, Column#14", - " └─HashAgg 1.00 root funcs:avg(distinct Column#71)->Column#11, funcs:min(Column#72)->Column#12, funcs:sum(distinct Column#73)->Column#13, funcs:max(Column#74)->Column#14", - " └─Projection 7100.44 root cast(test.tbl_15.col_96, decimal(10,0) UNSIGNED BINARY)->Column#71, Column#15, cast(test.tbl_15.col_91, decimal(3,0) UNSIGNED BINARY)->Column#73, Column#16", + " └─HashAgg 1.00 root funcs:avg(distinct Column#89)->Column#11, funcs:min(Column#90)->Column#12, funcs:sum(distinct Column#91)->Column#13, funcs:max(Column#92)->Column#14", + " └─Projection 7100.44 root cast(test.tbl_15.col_96, decimal(10,0) UNSIGNED BINARY)->Column#89, Column#15, cast(test.tbl_15.col_91, decimal(3,0) UNSIGNED BINARY)->Column#91, Column#16", " └─PartitionUnion 7100.44 root ", - " ├─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#24)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#26)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", - " │ └─TableReader 1775.11 root data:ExchangeSender", - " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#24, funcs:max(test.tbl_15.col_92)->Column#26", - " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", - " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", - " ├─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#36)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#38)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", - " │ └─TableReader 1775.11 root data:ExchangeSender", - " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#36, funcs:max(test.tbl_15.col_92)->Column#38", - " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", - " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", - " ├─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#48)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#50)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", - " │ └─TableReader 1775.11 root data:ExchangeSender", - " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#48, funcs:max(test.tbl_15.col_92)->Column#50", - " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", - " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", - " └─HashAgg 1775.11 root group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#60)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#62)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", - " └─TableReader 1775.11 root data:ExchangeSender", - " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#60, funcs:max(test.tbl_15.col_92)->Column#62", - " └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", - " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#18)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#20)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#18, funcs:max(test.tbl_15.col_92)->Column#20", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#30)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#32)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#30, funcs:max(test.tbl_15.col_92)->Column#32", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#42)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#44)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#42, funcs:max(test.tbl_15.col_92)->Column#44", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─TableReader 1775.11 root data:ExchangeSender", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#54)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#56)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#54, funcs:max(test.tbl_15.col_92)->Column#56", + " └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" ], "Warning": null }, @@ -7546,15 +7554,15 @@ "TopN 20.00 root Column#10, offset:0, count:20", "└─HashAgg 63.95 root group by:test.tbl_16.col_100, funcs:avg(Column#11, Column#12)->Column#10", " └─PartitionUnion 63.95 root ", - " ├─StreamAgg 31.98 root group by:Column#19, funcs:count(Column#16)->Column#11, funcs:sum(Column#17)->Column#12, funcs:firstrow(Column#18)->test.tbl_16.col_100", - " │ └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#17, test.tbl_16.col_100, test.tbl_16.col_100", + " ├─StreamAgg 31.98 root group by:Column#22, funcs:count(Column#19)->Column#11, funcs:sum(Column#20)->Column#12, funcs:firstrow(Column#21)->test.tbl_16.col_100", + " │ └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#20, test.tbl_16.col_100, test.tbl_16.col_100", " │ └─Sort 39.97 root test.tbl_16.col_100", " │ └─TableReader 39.97 root data:ExchangeSender", " │ └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", " │ └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p0 keep order:false, stats:pseudo", - " └─StreamAgg 31.98 root group by:Column#23, funcs:count(Column#20)->Column#11, funcs:sum(Column#21)->Column#12, funcs:firstrow(Column#22)->test.tbl_16.col_100", - " └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#21, test.tbl_16.col_100, test.tbl_16.col_100", + " └─StreamAgg 31.98 root group by:Column#26, funcs:count(Column#23)->Column#11, funcs:sum(Column#24)->Column#12, funcs:firstrow(Column#25)->test.tbl_16.col_100", + " └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#24, test.tbl_16.col_100, test.tbl_16.col_100", " └─Sort 39.97 root test.tbl_16.col_100", " └─TableReader 39.97 root data:ExchangeSender", " └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", From c5fc20367f81c79691ab3caa5dc8787613319920 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 19:33:54 +0800 Subject: [PATCH 08/11] fix unit-test Signed-off-by: guo-shaoge --- executor/tiflashtest/tiflash_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index e8cd94d889188..fec246a5c5057 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1270,7 +1270,7 @@ func TestDisaggregatedTiFlash(t *testing.T) { tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") err = tk.ExecToErr("select * from t;") - require.Contains(t, err.Error(), "Please check tiflash_compute node is available") + require.Contains(t, err.Error(), "tiflash_compute node is unavailable") config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = false @@ -1304,9 +1304,6 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) { require.NoError(t, err) tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") - needCheckTiFlashComputeNode := "false" - failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) - defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") tk.MustExec("set @@tidb_partition_prune_mode = 'static';") From 3b7e4d84ea47782b75c9057ca8b654e54846c2f7 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 20:09:45 +0800 Subject: [PATCH 09/11] refine comments Signed-off-by: guo-shaoge --- planner/core/find_best_task.go | 8 ++++++-- planner/core/task.go | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index b1a63031a6fb0..1ecc9f995243c 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2024,7 +2024,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !isDisaggregatedTiFlash) { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. - // But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table. + // But in disaggregated tiflash mode, we enable using mpp for static pruning partition table, because cop and batchCop is deprecated. ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } @@ -2053,7 +2053,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid // So have to return a rootTask, but prop requires mppTask, cannot meet this requirement. task = invalidTask } else if prop.TaskTp == property.RootTaskType { - // when got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // When got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // This is for situations like cannot generate mppTask for some operators. + // Such as when the build side of HashJoin is Projection, + // which cannot pushdown to tiflash(because TiFlash doesn't support some expr in Proj) + // So HashJoin cannot pushdown to tiflash. But we still want TableScan to run on tiflash. task = mppTask task = task.convertToRootTask(ds.ctx) if !task.invalid() { diff --git a/planner/core/task.go b/planner/core/task.go index 29b308b3db02d..f8155f9170e95 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1163,9 +1163,9 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { for _, t := range tasks { if _, ok := t.(*mppTask); ok && p.TP() == plancodec.TypePartitionUnion { - // In attach2MppTasks(), will attach Union to mppTask directly. - // But PartitionUnion cannot pushdown to tiflash, so disable PartitionUnion pushdown to tiflash explicitly. - // TODO: Let mppTask convertToRootTask instead of return invalidTask directly. + // In attach2MppTasks(), will attach PhysicalUnion to mppTask directly. + // But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly. + // For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask. return invalidTask } } From 62198293e8865bc63f8a5fbc72ea78dfdc946bba Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Jan 2023 20:38:46 +0800 Subject: [PATCH 10/11] add case Signed-off-by: guo-shaoge --- planner/core/testdata/plan_suite_in.json | 1 + planner/core/testdata/plan_suite_out.json | 28 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index ee95b65bd469e..6f6e74fac3cfa 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -1191,6 +1191,7 @@ "name": "TestHashAggPushdownToTiFlashCompute", "cases": [ "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;" ] } diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index e2e9a184d2c3d..14213e2223dab 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -7548,6 +7548,34 @@ ], "Warning": null }, + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─PartitionUnion 4.00 root ", + " ├─HashAgg 1.00 root funcs:count(Column#13)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#13", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#14)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#14", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#15)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#15", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─HashAgg 1.00 root funcs:count(Column#16)->Column#12", + " └─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#16", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, { "SQL": "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;", "Plan": [ From ef4e5c93e07f7cdae64500cf9527c38441ab9a21 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 31 Jan 2023 13:24:04 +0800 Subject: [PATCH 11/11] fix comments Signed-off-by: guo-shaoge --- planner/core/rule_aggregation_push_down.go | 7 ++++++- planner/core/task.go | 14 ++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index 8663f43feeb74..24aef4161a8ec 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -405,7 +405,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u if err != nil { return nil, err } - firstRow.Mode = aggregation.Partial1Mode + // Update mode of new generated firstRow as other agg funcs. + if len(agg.AggFuncs) != 0 { + firstRow.Mode = agg.AggFuncs[0].Mode + } else { + firstRow.Mode = aggregation.Partial1Mode + } newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow) } tmpSchema := expression.NewSchema(newAgg.GetGroupByCols()...) diff --git a/planner/core/task.go b/planner/core/task.go index f8155f9170e95..ff4e22756f15a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1161,16 +1161,14 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { } func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { - for _, t := range tasks { - if _, ok := t.(*mppTask); ok && p.TP() == plancodec.TypePartitionUnion { - // In attach2MppTasks(), will attach PhysicalUnion to mppTask directly. - // But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly. - // For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask. - return invalidTask - } - } for _, t := range tasks { if _, ok := t.(*mppTask); ok { + if p.TP() == plancodec.TypePartitionUnion { + // In attach2MppTasks(), will attach PhysicalUnion to mppTask directly. + // But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly. + // For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask. + return invalidTask + } return p.attach2MppTasks(tasks...) } }