Skip to content

Commit

Permalink
planner: prohibit StreamAgg with group keys for TiFlash (pingcap#39547)…
Browse files Browse the repository at this point in the history
  • Loading branch information
fixdb authored Jan 11, 2023
1 parent 2fa990e commit df459f1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
30 changes: 30 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,36 @@ func TestAggPushDownCountStar(t *testing.T) {
tk.MustQuery("select count(*) from c, o where c.c_id=o.c_id").Check(testkit.Rows("5"))
}

func TestGroupStreamAggOnTiFlash(t *testing.T) {
store, clean := testkit.CreateMockStore(t, withMockTiFlash(2))
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists foo")
tk.MustExec("create table foo(a int, b int, c int, d int, primary key(a,b,c,d))")
tk.MustExec("alter table foo set tiflash replica 1")
tk.MustExec("insert into foo values(1,2,3,1),(1,2,3,6),(1,2,3,5)," +
"(1,2,3,2),(1,2,3,4),(1,2,3,7),(1,2,3,3),(1,2,3,0)")
tb := external.GetTableByName(t, tk, "test", "foo")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@tidb_allow_mpp=0")
sql := "select a,b,c,count(*) from foo group by a,b,c order by a,b,c"
tk.MustQuery(sql).Check(testkit.Rows("1 2 3 8"))
rows := tk.MustQuery("explain " + sql).Rows()

for _, row := range rows {
resBuff := bytes.NewBufferString("")
fmt.Fprintf(resBuff, "%s\n", row)
res := resBuff.String()
// StreamAgg with group keys on TiFlash is not supported
if strings.Contains(res, "tiflash") {
require.NotContains(t, res, "StreamAgg")
}
}
}

func TestTiflashEmptyDynamicPruneResult(t *testing.T) {
store, clean := testkit.CreateMockStore(t, withMockTiFlash(2))
defer clean()
Expand Down
8 changes: 6 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1662,8 +1662,12 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
inputRows = t.count()
attachPlan2Task(p, t)
} else {
copTaskType := cop.getStoreType()
partialAgg, finalAgg := p.newPartialAggregate(copTaskType, false)
storeType := cop.getStoreType()
// TiFlash doesn't support Stream Aggregation
if storeType == kv.TiFlash && len(p.GroupByItems) > 0 {
return invalidTask
}
partialAgg, finalAgg := p.newPartialAggregate(storeType, false)
if finalAgg != nil {
final = finalAgg.(*PhysicalStreamAgg)
}
Expand Down

0 comments on commit df459f1

Please sign in to comment.