diff --git a/pkg/planner/cascades/memo/BUILD.bazel b/pkg/planner/cascades/memo/BUILD.bazel
index 64cf751ca9ec2..900fa9a0c386a 100644
--- a/pkg/planner/cascades/memo/BUILD.bazel
+++ b/pkg/planner/cascades/memo/BUILD.bazel
@@ -11,13 +11,14 @@ go_library(
     importpath = "github.com/pingcap/tidb/pkg/planner/cascades/memo",
     visibility = ["//visibility:public"],
     deps = [
+        "//pkg/expression",
         "//pkg/planner/cascades/base",
         "//pkg/planner/cascades/pattern",
         "//pkg/planner/cascades/util",
         "//pkg/planner/core/base",
         "//pkg/planner/property",
-        "//pkg/sessionctx",
         "//pkg/util/intest",
+        "@com_github_pingcap_failpoint//:failpoint",
     ],
 )
 
@@ -36,9 +37,10 @@ go_test(
     deps = [
         "//pkg/expression",
         "//pkg/planner/cascades/base",
-        "//pkg/planner/core",
         "//pkg/planner/core/operator/logicalop",
         "//pkg/testkit/testsetup",
+        "//pkg/util/mock",
+        "@com_github_pingcap_failpoint//:failpoint",
         "@com_github_stretchr_testify//require",
         "@org_uber_go_goleak//:goleak",
     ],
diff --git a/pkg/planner/cascades/memo/group.go b/pkg/planner/cascades/memo/group.go
index 1fc1887f26e72..c9a7f4a3286d1 100644
--- a/pkg/planner/cascades/memo/group.go
+++ b/pkg/planner/cascades/memo/group.go
@@ -17,12 +17,12 @@ package memo
 import (
 	"container/list"
 	"fmt"
-	"strconv"
 
 	"github.com/pingcap/tidb/pkg/planner/cascades/base"
 	"github.com/pingcap/tidb/pkg/planner/cascades/pattern"
 	"github.com/pingcap/tidb/pkg/planner/cascades/util"
 	"github.com/pingcap/tidb/pkg/planner/property"
+	"github.com/pingcap/tidb/pkg/util/intest"
 )
 
 var _ base.HashEquals = &Group{}
@@ -127,9 +127,47 @@ func (g *Group) GetFirstElem(operand pattern.Operand) *list.Element {
 	return g.Operand2FirstExpr[operand]
 }
 
+// HasLogicalProperty check whether current group has the logical property.
+func (g *Group) HasLogicalProperty() bool {
+	return g.logicalProp != nil
+}
+
+// GetLogicalProperty return this group's logical property.
+func (g *Group) GetLogicalProperty() *property.LogicalProperty {
+	intest.Assert(g.logicalProp != nil)
+	return g.logicalProp
+}
+
+// SetLogicalProperty set this group's logical property.
+func (g *Group) SetLogicalProperty(prop *property.LogicalProperty) {
+	g.logicalProp = prop
+}
+
+// IsExplored returns whether this group is explored.
+func (g *Group) IsExplored() bool {
+	return g.explored
+}
+
+// SetExplored set the group as tagged as explored.
+func (g *Group) SetExplored() {
+	g.explored = true
+}
+
 // String implements fmt.Stringer interface.
 func (g *Group) String(w util.StrBufferWriter) {
-	w.WriteString(fmt.Sprintf("inputs:%s", strconv.Itoa(int(g.groupID))))
+	w.WriteString(fmt.Sprintf("GID:%d", int(g.groupID)))
+}
+
+// ForEachGE traverse the inside group expression with f call on them each.
+func (g *Group) ForEachGE(f func(ge *GroupExpression) bool) {
+	var next bool
+	for elem := g.logicalExpressions.Front(); elem != nil; elem = elem.Next() {
+		expr := elem.Value.(*GroupExpression)
+		next = f(expr)
+		if !next {
+			break
+		}
+	}
 }
 
 // NewGroup creates a new Group with given logical prop.
diff --git a/pkg/planner/cascades/memo/group_expr.go b/pkg/planner/cascades/memo/group_expr.go
index 9c8d1c83a6115..5135bdc4c53c1 100644
--- a/pkg/planner/cascades/memo/group_expr.go
+++ b/pkg/planner/cascades/memo/group_expr.go
@@ -15,10 +15,13 @@
 package memo
 
 import (
+	"github.com/pingcap/failpoint"
+	"github.com/pingcap/tidb/pkg/expression"
 	base2 "github.com/pingcap/tidb/pkg/planner/cascades/base"
 	"github.com/pingcap/tidb/pkg/planner/cascades/pattern"
 	"github.com/pingcap/tidb/pkg/planner/cascades/util"
 	"github.com/pingcap/tidb/pkg/planner/core/base"
+	"github.com/pingcap/tidb/pkg/planner/property"
 	"github.com/pingcap/tidb/pkg/util/intest"
 )
 
@@ -122,3 +125,44 @@ func (e *GroupExpression) Init(h base2.Hasher) {
 	e.Hash64(h)
 	e.hash64 = h.Sum64()
 }
+
+// DeriveLogicalProp derive the new group's logical property from a specific GE.
+// DeriveLogicalProp is not called with recursive, because we only examine and
+// init new group from bottom-up, so we can sure that this new group's children
+// has already gotten its logical prop.
+func (e *GroupExpression) DeriveLogicalProp() (err error) {
+	if e.GetGroup().HasLogicalProperty() {
+		return nil
+	}
+	childStats := make([]*property.StatsInfo, 0, len(e.Inputs))
+	childSchema := make([]*expression.Schema, 0, len(e.Inputs))
+	for _, childG := range e.Inputs {
+		childGProp := childG.GetLogicalProperty()
+		childStats = append(childStats, childGProp.Stats)
+		childSchema = append(childSchema, childGProp.Schema)
+	}
+	e.GetGroup().SetLogicalProperty(property.NewLogicalProp())
+	// currently the schemaProducer side logical op is still useful for group schema.
+	// just add this mock for a mocked logical-plan which is with the id less than 0.
+	//  todo: functional dependency
+	tmpSchema := e.LogicalPlan.Schema()
+	tmpStats := e.LogicalPlan.StatsInfo()
+	// only for those new created logical op from XForm, we should rebuild their stats;
+	// in memo init phase, all logical ops has maintained their stats already, just use them.
+	if tmpStats == nil {
+		skipDeriveStats := false
+		failpoint.Inject("MockPlanSkipMemoDeriveStats", func(val failpoint.Value) {
+			skipDeriveStats = val.(bool)
+		})
+		if !skipDeriveStats {
+			// here can only derive the basic stats from bottom up, we can't pass any colGroups required by parents.
+			tmpStats, err = e.LogicalPlan.DeriveStats(childStats, tmpSchema, childSchema, nil)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	e.GetGroup().GetLogicalProperty().Schema = tmpSchema
+	e.GetGroup().GetLogicalProperty().Stats = tmpStats
+	return nil
+}
diff --git a/pkg/planner/cascades/memo/memo.go b/pkg/planner/cascades/memo/memo.go
index f6b0438f02e41..c5c51ab3ffedd 100644
--- a/pkg/planner/cascades/memo/memo.go
+++ b/pkg/planner/cascades/memo/memo.go
@@ -19,17 +19,13 @@ import (
 
 	base2 "github.com/pingcap/tidb/pkg/planner/cascades/base"
 	"github.com/pingcap/tidb/pkg/planner/core/base"
-	"github.com/pingcap/tidb/pkg/sessionctx"
 	"github.com/pingcap/tidb/pkg/util/intest"
 )
 
 // Memo is the main structure of the memo package.
 type Memo struct {
-	// ctx is the context of the memo.
-	sCtx sessionctx.Context
-
 	// groupIDGen is the incremental group id for internal usage.
-	groupIDGen GroupIDGenerator
+	groupIDGen *GroupIDGenerator
 
 	// rootGroup is the root group of the memo.
 	rootGroup *Group
@@ -48,10 +44,9 @@ type Memo struct {
 }
 
 // NewMemo creates a new memo.
-func NewMemo(ctx sessionctx.Context) *Memo {
+func NewMemo() *Memo {
 	return &Memo{
-		sCtx:          ctx,
-		groupIDGen:    GroupIDGenerator{id: 0},
+		groupIDGen:    &GroupIDGenerator{id: 0},
 		groups:        list.New(),
 		groupID2Group: make(map[GroupID]*list.Element),
 		hasher:        base2.NewHashEqualer(),
@@ -59,73 +54,122 @@ func NewMemo(ctx sessionctx.Context) *Memo {
 }
 
 // GetHasher gets a hasher from the memo that ready to use.
-func (m *Memo) GetHasher() base2.Hasher {
-	m.hasher.Reset()
-	return m.hasher
+func (mm *Memo) GetHasher() base2.Hasher {
+	mm.hasher.Reset()
+	return mm.hasher
 }
 
-// CopyIn copies a logical plan into the memo with format as GroupExpression.
-func (m *Memo) CopyIn(target *Group, lp base.LogicalPlan) *GroupExpression {
+// CopyIn copies a MemoExpression representation into the memo with format as GroupExpression inside.
+// The generic logical forest inside memo is represented as memo group expression tree, while for entering
+// and re-feeding the memo, we use the memoExpression as the currency:
+//
+// entering(init memo)
+//
+//	  lp                          ┌──────────┐
+//	 /  \                         │ memo:    │
+//	lp   lp       --copyIN->      │  G(ge)   │
+//	    /  \                      │   /  \   │
+//	  ...  ...                    │  G    G  │
+//	                              └──────────┘
+//
+// re-feeding (intake XForm output)
+//
+//	  lp                          ┌──────────┐
+//	 /  \                         │ memo:    │
+//	GE  lp        --copyIN->      │  G(ge)   │
+//	     |                        │   /  \   │
+//	    GE                        │  G    G  │
+//	                              └──────────┘
+//
+// the bare lp means the new created logical op or that whose child has changed which invalidate it's original
+// old belonged group, make it back to bare-lp for re-inserting again in copyIn.
+func (mm *Memo) CopyIn(target *Group, lp base.LogicalPlan) (*GroupExpression, error) {
 	// Group the children first.
 	childGroups := make([]*Group, 0, len(lp.Children()))
 	for _, child := range lp.Children() {
-		// todo: child.getGroupExpression.GetGroup directly
-		groupExpr := m.CopyIn(nil, child)
-		group := groupExpr.group
-		intest.Assert(group != nil)
-		intest.Assert(group != target)
-		childGroups = append(childGroups, group)
+		var currentChildG *Group
+		if ge, ok := child.(*GroupExpression); ok {
+			// which means it's the earliest unchanged GroupExpression from rule XForm.
+			currentChildG = ge.GetGroup()
+		} else {
+			// which means it's a new/changed logical op, downward to get its input group ids to complete it.
+			ge, err := mm.CopyIn(nil, child)
+			if err != nil {
+				return nil, err
+			}
+			currentChildG = ge.GetGroup()
+		}
+		intest.Assert(currentChildG != nil)
+		intest.Assert(currentChildG != target)
+		childGroups = append(childGroups, currentChildG)
 	}
-
-	hasher := m.GetHasher()
+	hasher := mm.GetHasher()
 	groupExpr := NewGroupExpression(lp, childGroups)
 	groupExpr.Init(hasher)
-	m.insertGroupExpression(groupExpr, target)
-	// todo: new group need to derive the logical property.
-	return groupExpr
+	if mm.InsertGroupExpression(groupExpr, target) && target == nil {
+		// derive logical property for new group.
+		err := groupExpr.DeriveLogicalProp()
+		if err != nil {
+			return nil, err
+		}
+	}
+	return groupExpr, nil
 }
 
 // GetGroups gets all groups in the memo.
-func (m *Memo) GetGroups() *list.List {
-	return m.groups
+func (mm *Memo) GetGroups() *list.List {
+	return mm.groups
 }
 
 // GetGroupID2Group gets the map from group id to group.
-func (m *Memo) GetGroupID2Group() map[GroupID]*list.Element {
-	return m.groupID2Group
+func (mm *Memo) GetGroupID2Group() map[GroupID]*list.Element {
+	return mm.groupID2Group
 }
 
 // GetRootGroup gets the root group of the memo.
-func (m *Memo) GetRootGroup() *Group {
-	return m.rootGroup
+func (mm *Memo) GetRootGroup() *Group {
+	return mm.rootGroup
 }
 
+// InsertGroupExpression insert ge into a target group.
 // @bool indicates whether the groupExpr is inserted to a new group.
-func (m *Memo) insertGroupExpression(groupExpr *GroupExpression, target *Group) bool {
+func (mm *Memo) InsertGroupExpression(groupExpr *GroupExpression, target *Group) bool {
 	// for group merge, here groupExpr is the new groupExpr with undetermined belonged group.
 	// we need to use groupExpr hash to find whether there is same groupExpr existed before.
 	// if existed and the existed groupExpr.Group is not same with target, we should merge them up.
 	// todo: merge group
 	if target == nil {
-		target = m.NewGroup()
-		m.groups.PushBack(target)
-		m.groupID2Group[target.groupID] = m.groups.Back()
+		target = mm.NewGroup()
+		mm.groups.PushBack(target)
+		mm.groupID2Group[target.groupID] = mm.groups.Back()
 	}
 	target.Insert(groupExpr)
 	return true
 }
 
 // NewGroup creates a new group.
-func (m *Memo) NewGroup() *Group {
+func (mm *Memo) NewGroup() *Group {
 	group := NewGroup(nil)
-	group.groupID = m.groupIDGen.NextGroupID()
+	group.groupID = mm.groupIDGen.NextGroupID()
 	return group
 }
 
 // Init initializes the memo with a logical plan, converting logical plan tree format into group tree.
-func (m *Memo) Init(plan base.LogicalPlan) *GroupExpression {
-	intest.Assert(m.groups.Len() == 0)
-	gE := m.CopyIn(nil, plan)
-	m.rootGroup = gE.GetGroup()
+func (mm *Memo) Init(plan base.LogicalPlan) *GroupExpression {
+	intest.Assert(mm.groups.Len() == 0)
+	gE, _ := mm.CopyIn(nil, plan)
+	mm.rootGroup = gE.GetGroup()
 	return gE
 }
+
+// ForEachGroup traverse the inside group expression with f call on them each.
+func (mm *Memo) ForEachGroup(f func(g *Group) bool) {
+	var next bool
+	for elem := mm.GetGroups().Front(); elem != nil; elem = elem.Next() {
+		expr := elem.Value.(*Group)
+		next = f(expr)
+		if !next {
+			break
+		}
+	}
+}
diff --git a/pkg/planner/cascades/memo/memo_test.go b/pkg/planner/cascades/memo/memo_test.go
index 38018780197e3..29b1416a4a09e 100644
--- a/pkg/planner/cascades/memo/memo_test.go
+++ b/pkg/planner/cascades/memo/memo_test.go
@@ -17,26 +17,31 @@ package memo
 import (
 	"testing"
 
-	plannercore "github.com/pingcap/tidb/pkg/planner/core"
+	"github.com/pingcap/failpoint"
 	"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
+	"github.com/pingcap/tidb/pkg/util/mock"
 	"github.com/stretchr/testify/require"
 )
 
 func TestMemo(t *testing.T) {
-	ctx := plannercore.MockContext()
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
+	ctx := mock.NewContext()
 	t1 := logicalop.DataSource{}.Init(ctx, 0)
 	t2 := logicalop.DataSource{}.Init(ctx, 0)
 	join := logicalop.LogicalJoin{}.Init(ctx, 0)
 	join.SetChildren(t1, t2)
 
-	memo := NewMemo(ctx)
-	memo.Init(join)
-	require.Equal(t, 3, memo.groups.Len())
-	require.Equal(t, 3, len(memo.groupID2Group))
+	mm := NewMemo()
+	mm.Init(join)
+	require.Equal(t, 3, mm.GetGroups().Len())
+	require.Equal(t, 3, len(mm.GetGroupID2Group()))
 
 	// iter memo.groups to assert group ids.
 	cnt := 1
-	for e := memo.groups.Front(); e != nil; e = e.Next() {
+	for e := mm.GetGroups().Front(); e != nil; e = e.Next() {
 		group := e.Value.(*Group)
 		require.NotNil(t, group)
 		require.Equal(t, GroupID(cnt), group.groupID)
@@ -45,35 +50,40 @@ func TestMemo(t *testing.T) {
 }
 
 func TestInsertGE(t *testing.T) {
-	ctx := plannercore.MockContext()
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
+	ctx := mock.NewContext()
 	t1 := logicalop.DataSource{}.Init(ctx, 0)
 	t2 := logicalop.DataSource{}.Init(ctx, 0)
 	join := logicalop.LogicalJoin{}.Init(ctx, 0)
 	join.SetChildren(t1, t2)
 
-	memo := NewMemo(ctx)
-	memo.Init(join)
-	require.Equal(t, 3, memo.groups.Len())
-	require.Equal(t, 3, len(memo.groupID2Group))
+	mm := NewMemo()
+	mm.Init(join)
+	require.Equal(t, 3, mm.GetGroups().Len())
+	require.Equal(t, 3, len(mm.GetGroupID2Group()))
 
 	// prepare a new group expression with join's group as its children.
 	limit := logicalop.LogicalLimit{}.Init(ctx, 0)
-	hasher := memo.GetHasher()
-	groupExpr := NewGroupExpression(limit, []*Group{memo.rootGroup})
+	limit.SetID(-4)
+	hasher := mm.GetHasher()
+	groupExpr := NewGroupExpression(limit, []*Group{mm.GetRootGroup()})
 	groupExpr.Init(hasher)
 
 	// Insert a new group with a new expression.
-	memo.insertGroupExpression(groupExpr, nil)
-	require.Equal(t, 4, memo.groups.Len())
-	require.Equal(t, 4, len(memo.groupID2Group))
+	mm.InsertGroupExpression(groupExpr, nil)
+	require.Equal(t, 4, mm.GetGroups().Len())
+	require.Equal(t, 4, len(mm.GetGroupID2Group()))
 
 	// iter memo.groups to assert group ids.
 	cnt := 1
-	for e := memo.groups.Front(); e != nil; e = e.Next() {
+	for e := mm.GetGroups().Front(); e != nil; e = e.Next() {
 		group := e.Value.(*Group)
 		require.NotNil(t, group)
-		require.Equal(t, GroupID(cnt), group.groupID)
+		require.Equal(t, GroupID(cnt), group.GetGroupID())
 		cnt++
 	}
-	require.Equal(t, memo.groups.Back().Value.(*Group).groupID, GroupID(cnt-1))
+	require.Equal(t, mm.GetGroups().Back().Value.(*Group).GetGroupID(), GroupID(cnt-1))
 }
diff --git a/pkg/planner/cascades/rule/BUILD.bazel b/pkg/planner/cascades/rule/BUILD.bazel
index 9937e04420944..3b03895534728 100644
--- a/pkg/planner/cascades/rule/BUILD.bazel
+++ b/pkg/planner/cascades/rule/BUILD.bazel
@@ -31,6 +31,7 @@ go_test(
         "//pkg/planner/cascades/util",
         "//pkg/planner/core/operator/logicalop",
         "//pkg/util/mock",
+        "@com_github_pingcap_failpoint//:failpoint",
         "@com_github_stretchr_testify//require",
     ],
 )
diff --git a/pkg/planner/cascades/rule/binder_test.go b/pkg/planner/cascades/rule/binder_test.go
index 44bea77656d16..49fbac0f4d9b7 100644
--- a/pkg/planner/cascades/rule/binder_test.go
+++ b/pkg/planner/cascades/rule/binder_test.go
@@ -18,6 +18,7 @@ import (
 	"bytes"
 	"testing"
 
+	"github.com/pingcap/failpoint"
 	pmodel "github.com/pingcap/tidb/pkg/parser/model"
 	"github.com/pingcap/tidb/pkg/planner/cascades/memo"
 	"github.com/pingcap/tidb/pkg/planner/cascades/pattern"
@@ -28,13 +29,17 @@ import (
 )
 
 func TestBinderSuccess(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	t1 := logicalop.DataSource{}.Init(ctx, 0)
 	t2 := logicalop.DataSource{}.Init(ctx, 0)
 	join := logicalop.LogicalJoin{}.Init(ctx, 0)
 	join.SetChildren(t1, t2)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	mm.Init(join)
 	require.Equal(t, 3, mm.GetGroups().Len())
 	require.Equal(t, 3, len(mm.GetGroupID2Group()))
@@ -62,13 +67,17 @@ func TestBinderSuccess(t *testing.T) {
 }
 
 func TestBinderFail(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	t1 := logicalop.DataSource{}.Init(ctx, 0)
 	t2 := logicalop.DataSource{}.Init(ctx, 0)
 	join := logicalop.LogicalJoin{}.Init(ctx, 0)
 	join.SetChildren(t1, t2)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	mm.Init(join)
 	require.Equal(t, 3, mm.GetGroups().Len())
 	require.Equal(t, 3, len(mm.GetGroupID2Group()))
@@ -107,7 +116,7 @@ func TestBinderFail(t *testing.T) {
 	require.Equal(t, b.String(), "")
 
 	// renew memo
-	mm = memo.NewMemo(ctx)
+	mm = memo.NewMemo()
 	mm.Init(p1)
 	rootGE = mm.GetRootGroup().GetLogicalExpressions().Back().Value.(*memo.GroupExpression)
 	binder = NewBinder(pa, rootGE)
@@ -117,17 +126,21 @@ func TestBinderFail(t *testing.T) {
 	holder = binder.Next()
 	require.Nil(t, holder)
 	buf.Flush()
-	require.Equal(t, b.String(), "GE:Limit_4{inputs:1}\n")
+	require.Equal(t, b.String(), "GE:Limit_4{GID:1}\n")
 }
 
 func TestBinderTopNode(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	t1 := logicalop.DataSource{}.Init(ctx, 0)
 	t2 := logicalop.DataSource{}.Init(ctx, 0)
 	join := logicalop.LogicalJoin{}.Init(ctx, 0)
 	join.SetChildren(t1, t2)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	mm.Init(join)
 	require.Equal(t, 3, mm.GetGroups().Len())
 	require.Equal(t, 3, len(mm.GetGroupID2Group()))
@@ -141,10 +154,14 @@ func TestBinderTopNode(t *testing.T) {
 }
 
 func TestBinderOneNode(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	join := logicalop.LogicalJoin{}.Init(ctx, 0)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	mm.Init(join)
 	require.Equal(t, 1, mm.GetGroups().Len())
 	require.Equal(t, 1, len(mm.GetGroupID2Group()))
@@ -157,6 +174,10 @@ func TestBinderOneNode(t *testing.T) {
 }
 
 func TestBinderSubTreeMatch(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	t1 := logicalop.DataSource{}.Init(ctx, 0)
 	t2 := logicalop.DataSource{}.Init(ctx, 0)
@@ -171,7 +192,7 @@ func TestBinderSubTreeMatch(t *testing.T) {
 	join3 := logicalop.LogicalJoin{}.Init(ctx, 0)
 	join3.SetChildren(join1, join2)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	mm.Init(join3)
 	require.Equal(t, 7, mm.GetGroups().Len())
 	require.Equal(t, 7, len(mm.GetGroupID2Group()))
@@ -200,6 +221,10 @@ func TestBinderSubTreeMatch(t *testing.T) {
 }
 
 func TestBinderMultiNext(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	asT1 := pmodel.NewCIStr("t1")
 	asT2 := pmodel.NewCIStr("t2")
@@ -213,7 +238,7 @@ func TestBinderMultiNext(t *testing.T) {
 	t3 := logicalop.DataSource{TableAsName: &asT3}.Init(ctx, 0)
 	t4 := logicalop.DataSource{TableAsName: &asT4}.Init(ctx, 0)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	gE := mm.Init(join1)
 
 	// which means t1 and t3 are equivalent class.
@@ -297,6 +322,10 @@ func TestBinderMultiNext(t *testing.T) {
 }
 
 func TestBinderAny(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	asT1 := pmodel.NewCIStr("t1")
 	asT2 := pmodel.NewCIStr("t2")
@@ -310,7 +339,7 @@ func TestBinderAny(t *testing.T) {
 	t3 := logicalop.DataSource{TableAsName: &asT3}.Init(ctx, 0)
 	t4 := logicalop.DataSource{TableAsName: &asT4}.Init(ctx, 0)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	gE := mm.Init(join1)
 
 	// which means t1 and t3 are equivalent class.
@@ -380,6 +409,10 @@ func TestBinderAny(t *testing.T) {
 }
 
 func TestBinderMultiAny(t *testing.T) {
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
+	defer func() {
+		require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
+	}()
 	ctx := mock.NewContext()
 	asT1 := pmodel.NewCIStr("t1")
 	asT2 := pmodel.NewCIStr("t2")
@@ -393,7 +426,7 @@ func TestBinderMultiAny(t *testing.T) {
 	t3 := logicalop.DataSource{TableAsName: &asT3}.Init(ctx, 0)
 	t4 := logicalop.DataSource{TableAsName: &asT4}.Init(ctx, 0)
 
-	mm := memo.NewMemo(ctx)
+	mm := memo.NewMemo()
 	gE := mm.Init(join1)
 
 	// which means t1 and t3 are equivalent class.
diff --git a/pkg/planner/core/casetest/cascades/BUILD.bazel b/pkg/planner/core/casetest/cascades/BUILD.bazel
new file mode 100644
index 0000000000000..03afd3005333c
--- /dev/null
+++ b/pkg/planner/core/casetest/cascades/BUILD.bazel
@@ -0,0 +1,27 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_test")
+
+go_test(
+    name = "cascades_test",
+    timeout = "short",
+    srcs = [
+        "main_test.go",
+        "memo_test.go",
+    ],
+    data = glob(["testdata/**"]),
+    flaky = True,
+    deps = [
+        "//pkg/parser",
+        "//pkg/planner/cascades/memo",
+        "//pkg/planner/cascades/util",
+        "//pkg/planner/core",
+        "//pkg/planner/core/base",
+        "//pkg/planner/core/resolve",
+        "//pkg/testkit",
+        "//pkg/testkit/testdata",
+        "//pkg/testkit/testmain",
+        "//pkg/testkit/testsetup",
+        "//pkg/util/hint",
+        "@com_github_stretchr_testify//require",
+        "@org_uber_go_goleak//:goleak",
+    ],
+)
diff --git a/pkg/planner/core/casetest/cascades/main_test.go b/pkg/planner/core/casetest/cascades/main_test.go
new file mode 100644
index 0000000000000..0f14a5abcfe3d
--- /dev/null
+++ b/pkg/planner/core/casetest/cascades/main_test.go
@@ -0,0 +1,53 @@
+// Copyright 2020 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cascades
+
+import (
+	"flag"
+	"testing"
+
+	"github.com/pingcap/tidb/pkg/testkit/testdata"
+	"github.com/pingcap/tidb/pkg/testkit/testmain"
+	"github.com/pingcap/tidb/pkg/testkit/testsetup"
+	"go.uber.org/goleak"
+)
+
+var testDataMap = make(testdata.BookKeeper)
+
+func TestMain(m *testing.M) {
+	testsetup.SetupForCommonTest()
+	flag.Parse()
+	testDataMap.LoadTestSuiteData("testdata", "cascades_suite")
+	opts := []goleak.Option{
+		goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
+		goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"),
+		goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
+		goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
+		goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
+		goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
+		goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
+	}
+
+	callback := func(i int) int {
+		testDataMap.GenerateOutputIfNeeded()
+		return i
+	}
+
+	goleak.VerifyTestMain(testmain.WrapTestingM(m, callback), opts...)
+}
+
+func GetCascadesSuiteData() testdata.TestData {
+	return testDataMap["cascades_suite"]
+}
diff --git a/pkg/planner/core/casetest/cascades/memo_test.go b/pkg/planner/core/casetest/cascades/memo_test.go
new file mode 100644
index 0000000000000..dae0149f20db5
--- /dev/null
+++ b/pkg/planner/core/casetest/cascades/memo_test.go
@@ -0,0 +1,121 @@
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cascades
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/pingcap/tidb/pkg/parser"
+	"github.com/pingcap/tidb/pkg/planner/cascades/memo"
+	"github.com/pingcap/tidb/pkg/planner/cascades/util"
+	plannercore "github.com/pingcap/tidb/pkg/planner/core"
+	"github.com/pingcap/tidb/pkg/planner/core/base"
+	"github.com/pingcap/tidb/pkg/planner/core/resolve"
+	"github.com/pingcap/tidb/pkg/testkit"
+	"github.com/pingcap/tidb/pkg/testkit/testdata"
+	"github.com/pingcap/tidb/pkg/util/hint"
+	"github.com/stretchr/testify/require"
+)
+
+func TestDeriveStats(t *testing.T) {
+	store := testkit.CreateMockStore(t)
+	tk := testkit.NewTestKit(t, store)
+	tk.MustExec("use test")
+	tk.MustExec("drop table if exists t1, t2")
+	tk.MustExec("create table t1(a int not null, b int not null, key(a,b))")
+	tk.MustExec("insert into t1 values(1,1),(1,2),(2,1),(2,2),(1,1)")
+	tk.MustExec("create table t2(a int not null, b int not null, key(a,b))")
+	tk.MustExec("insert into t2 values(1,1),(1,2),(1,3),(2,1),(2,2),(2,3),(3,1),(3,2),(3,3),(1,1)")
+	tk.MustExec("analyze table t1")
+	tk.MustExec("analyze table t2")
+
+	ctx := context.Background()
+	p := parser.New()
+	var input []string
+	var output []struct {
+		SQL string
+		Str []string
+	}
+	statsSuiteData := GetCascadesSuiteData()
+	statsSuiteData.LoadTestCases(t, &input, &output)
+	for i, tt := range input {
+		stmt, err := p.ParseOneStmt(tt, "", "")
+		require.NoError(t, err, tt)
+		ret := &plannercore.PreprocessorReturn{}
+		nodeW := resolve.NewNodeW(stmt)
+		err = plannercore.Preprocess(context.Background(), tk.Session(), nodeW, plannercore.WithPreprocessorReturn(ret))
+		require.NoError(t, err)
+		tk.Session().GetSessionVars().PlanColumnID.Store(0)
+		builder, _ := plannercore.NewPlanBuilder().Init(tk.Session().GetPlanCtx(), ret.InfoSchema, hint.NewQBHintHandler(nil))
+		p, err := builder.Build(ctx, nodeW)
+		require.NoError(t, err, tt)
+		p, err = plannercore.LogicalOptimizeTest(ctx, builder.GetOptFlag(), p.(base.LogicalPlan))
+		require.NoError(t, err, tt)
+		lp := p.(base.LogicalPlan)
+		_, err = plannercore.RecursiveDeriveStats4Test(lp)
+		require.NoError(t, err, tt)
+		// after stats derive is done, which means the up-down propagation of group ndv is done, in bottom-up building phase
+		// of memo, we don't have to expect the upper operator's group cols passing down anymore.
+		mm := memo.NewMemo()
+		mm.Init(lp)
+		// check the stats state in memo group.
+		b := &bytes.Buffer{}
+		sb := util.NewStrBuffer(b)
+		var strs []string
+		mm.ForEachGroup(func(g *memo.Group) bool {
+			b.Reset()
+			// record group
+			g.String(sb)
+			sb.WriteString(", ")
+			// record first ge
+			g.ForEachGE(func(ge *memo.GroupExpression) bool {
+				ge.String(sb)
+				return false
+			})
+			sb.WriteString(", ")
+			// record group stats
+			logicProp := g.GetLogicalProperty()
+			if logicProp == nil {
+				sb.WriteString("logic prop:nil")
+			} else {
+				sb.WriteString("logic prop:{")
+				if logicProp.Stats == nil {
+					sb.WriteString("stats:nil,")
+				} else {
+					statsStr := fmt.Sprintf("count %v, ColNDVs %v, GroupNDVs %v", logicProp.Stats.RowCount, logicProp.Stats.ColNDVs, logicProp.Stats.GroupNDVs)
+					sb.WriteString("stats:{" + statsStr + "}")
+				}
+				sb.WriteString(", ")
+				if logicProp.Schema == nil {
+					sb.WriteString("schema:nil")
+				} else {
+					sb.WriteString("schema:{" + logicProp.Schema.String() + "}")
+				}
+				sb.WriteString("}")
+			}
+			sb.Flush()
+			strs = append(strs, b.String())
+			return true
+		})
+		testdata.OnRecord(func() {
+			output[i].SQL = tt
+			output[i].Str = strs
+		})
+		require.Equal(t, output[i].Str, strs, "case i "+tt)
+	}
+}
diff --git a/pkg/planner/core/casetest/cascades/testdata/cascades_suite_in.json b/pkg/planner/core/casetest/cascades/testdata/cascades_suite_in.json
new file mode 100644
index 0000000000000..b87d5e655a16e
--- /dev/null
+++ b/pkg/planner/core/casetest/cascades/testdata/cascades_suite_in.json
@@ -0,0 +1,57 @@
+[
+  {
+    "name": "TestDeriveStats",
+    "cases": [
+      // DataSource -> Aggregation.
+      "select count(1) from t1 group by a, b",
+      // DataSource -> Join.
+      "select * from t1, t2 where t1.a = t2.a and t1.b = t2.b",
+      // DataSource(Range) -> Aggregation.
+      "select count(1) from t1 where a > 0 group by a, b",
+      // DataSource(Selection) -> Aggregation.
+      "select count(1) from t1 where b > 0 group by a, b",
+      // DataSource -> Selection -> Aggregation. Change `cos` to another function if it can be pushed down to copr later.
+      "select count(1) from t1 where cos(a) > 0 group by a, b",
+      // DataSource -> Projection -> Aggregation.
+      "select count(c3) from (select a as c1, b as c2, a+1 as c3 from t1) as tmp group by c2, c1",
+      // DataSource -> Projection -> Aggregation.
+      "select count(c3) from (select a+b as c1, b as c2, a+1 as c3 from t1) as tmp group by c2, c1",
+      // DataSource -> Apply(LeftOuterJoin) -> Aggregation.
+      "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b > (select t2.b from t2 where t2.a = t1.a)) as cmp from t1) tmp group by tmp.a, tmp.b",
+      // DataSource -> Apply(InnerJoin) -> Aggregation.
+      "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b > (select t2.b from t2 where t2.a = t1.a)) tmp group by tmp.a, tmp.b",
+      // DataSource -> Apply(LeftOuterSemiJoin) -> Aggregation.
+      "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b in (select t2.b from t2 where t2.a = t1.a limit 3)) as cmp from t1) tmp group by tmp.a, tmp.b",
+      // DataSource -> Apply(AntiLeftOuterSemiJoin) -> Aggregation.
+      "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b not in (select t2.b from t2 where t2.a = t1.a limit 3)) as cmp from t1) tmp group by tmp.a, tmp.b",
+      // DataSource -> Apply(SemiJoin) -> Aggregation.
+      "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b in (select t2.b from t2 where t2.a = t1.a limit 3)) tmp group by tmp.a, tmp.b",
+      // DataSource -> Apply(AntiSemiJoin) -> Aggregation.
+      "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b not in (select t2.b from t2 where t2.a = t1.a limit 3)) tmp group by tmp.a, tmp.b",
+      // DataSource -> InnerJoin -> Aggregation.
+      "select count(1) from t1, t2 where t1.a = t2.a group by t1.a, t1.b",
+      // DataSource -> LeftOuterJoin -> Aggregation.
+      "select count(1) from t1 left join t2 on t1.a = t2.a group by t1.a, t1.b",
+      // DataSource -> LeftOuterJoin -> Aggregation.
+      "select count(1) from t1 left join t2 on t1.a = t2.a group by t2.a, t2.b",
+      // DataSource -> RightOuterJoin -> Aggregation.
+      "select count(1) from t1 right join t2 on t1.a = t2.a group by t1.a, t1.b",
+      // DataSource -> RightOuterJoin -> Aggregation.
+      "select count(1) from t1 right join t2 on t1.a = t2.a group by t2.a, t2.b",
+      // DataSource -> LeftOuterSemiJoin -> Aggregation.
+      "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b in (select t2.b from t2 where t2.a > t1.a)) as cmp from t1) tmp group by tmp.a, tmp.b",
+      // DataSource -> AntiLeftOuterSemiJoin -> Aggregation.
+      "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b not in (select t2.b from t2 where t2.a > t1.a)) as cmp from t1) tmp group by tmp.a, tmp.b",
+      // DataSource -> SemiJoin -> Aggregation.
+      "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b in (select t2.b from t2 where t2.a > t1.a)) tmp group by tmp.a, tmp.b",
+      // DataSource -> AntiSemiJoin -> Aggregation.
+      "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b not in (select t2.b from t2 where t2.a > t1.a)) tmp group by tmp.a, tmp.b",
+      // DataSource -> Aggregation -> Join.
+      "select * from t1 left join (select t2.a as a, t2.b as b, count(1) as cnt from t2 group by t2.a, t2.b) as tmp on t1.a = tmp.a and t1.b = tmp.b",
+      // DataSource -> Limit -> Aggregation.
+      "select count(1) from (select t1.a as a, t1.b as b from t1 limit 3) tmp group by tmp.a, tmp.b",
+      // DataSource -> Window -> Aggregation.
+      "select count(tmp.a_sum) from (select t1.a as a, t1.b as b, sum(a) over() as a_sum from t1) tmp group by tmp.a, tmp.b"
+    ]
+  }
+]
diff --git a/pkg/planner/core/casetest/cascades/testdata/cascades_suite_out.json b/pkg/planner/core/casetest/cascades/testdata/cascades_suite_out.json
new file mode 100644
index 0000000000000..af8b391df6569
--- /dev/null
+++ b/pkg/planner/core/casetest/cascades/testdata/cascades_suite_out.json
@@ -0,0 +1,249 @@
+[
+  {
+    "Name": "TestDeriveStats",
+    "Cases": [
+      {
+        "SQL": "select count(1) from t1 group by a, b",
+        "Str": [
+          "GID:1, GE:DataSource_1{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Aggregation_2{GID:1}, logic prop:{stats:{count 4, ColNDVs map[4:4], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}",
+          "GID:3, GE:Projection_3{GID:2}, logic prop:{stats:{count 4, ColNDVs map[4:4], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select * from t1, t2 where t1.a = t2.a and t1.b = t2.b",
+        "Str": [
+          "GID:1, GE:DataSource_4{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_5{}, logic prop:{stats:{count 10, ColNDVs map[4:3 5:3], GroupNDVs [{[4 5] 9}]}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_9{GID:1, GID:2}, logic prop:{stats:{count 5.555555555555555, ColNDVs map[1:2 2:2 4:3 5:3], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Projection_8{GID:3}, logic prop:{stats:{count 5.555555555555555, ColNDVs map[1:2 2:2 4:3 5:3], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a,test.t2.b] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1 where a > 0 group by a, b",
+        "Str": [
+          "GID:1, GE:DataSource_10{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Aggregation_12{GID:1}, logic prop:{stats:{count 4, ColNDVs map[4:4], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}",
+          "GID:3, GE:Projection_13{GID:2}, logic prop:{stats:{count 4, ColNDVs map[4:4], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1 where b > 0 group by a, b",
+        "Str": [
+          "GID:1, GE:DataSource_14{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Aggregation_16{GID:1}, logic prop:{stats:{count 4, ColNDVs map[4:4], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}",
+          "GID:3, GE:Projection_17{GID:2}, logic prop:{stats:{count 4, ColNDVs map[4:4], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1 where cos(a) > 0 group by a, b",
+        "Str": [
+          "GID:1, GE:DataSource_18{}, logic prop:{stats:{count 4, ColNDVs map[1:1.6 2:1.6], GroupNDVs [{[1 2] 3.2}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Aggregation_20{GID:1}, logic prop:{stats:{count 3.2, ColNDVs map[4:3.2], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}",
+          "GID:3, GE:Projection_21{GID:2}, logic prop:{stats:{count 3.2, ColNDVs map[4:3.2], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(c3) from (select a as c1, b as c2, a+1 as c3 from t1) as tmp group by c2, c1",
+        "Str": [
+          "GID:1, GE:DataSource_22{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Aggregation_24{GID:1}, logic prop:{stats:{count 4, ColNDVs map[5:4], GroupNDVs []}, schema:{Column: [Column#5] Key: [] Unique key: []}}",
+          "GID:3, GE:Projection_25{GID:2}, logic prop:{stats:{count 4, ColNDVs map[5:4], GroupNDVs []}, schema:{Column: [Column#5] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(c3) from (select a+b as c1, b as c2, a+1 as c3 from t1) as tmp group by c2, c1",
+        "Str": [
+          "GID:1, GE:DataSource_26{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Aggregation_28{GID:1}, logic prop:{stats:{count 2, ColNDVs map[6:2], GroupNDVs []}, schema:{Column: [Column#6] Key: [] Unique key: []}}",
+          "GID:3, GE:Projection_29{GID:2}, logic prop:{stats:{count 2, ColNDVs map[6:2], GroupNDVs []}, schema:{Column: [Column#6] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b > (select t2.b from t2 where t2.a = t1.a)) as cmp from t1) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_30{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_33{}, logic prop:{stats:{count 3.333333333333333, ColNDVs map[7:1 8:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:MaxOneRow_36{GID:2}, logic prop:{stats:{count 1, ColNDVs map[7:1 8:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Apply_37{GID:1, GID:3}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 7:5 8:5], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:5, GE:Aggregation_38{GID:4}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}",
+          "GID:6, GE:Projection_39{GID:5}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b > (select t2.b from t2 where t2.a = t1.a)) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_40{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_42{}, logic prop:{stats:{count 3.333333333333333, ColNDVs map[4:1 5:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:MaxOneRow_45{GID:2}, logic prop:{stats:{count 1, ColNDVs map[4:1 5:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Selection_50{GID:3}, logic prop:{stats:{count 0.8, ColNDVs map[4:0.8 5:0.8], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:5, GE:Apply_46{GID:1, GID:4}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 4:5 5:5], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:6, GE:Aggregation_48{GID:5}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:7, GE:Projection_49{GID:6}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b in (select t2.b from t2 where t2.a = t1.a limit 3)) as cmp from t1) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_51{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_54{}, logic prop:{stats:{count 3.333333333333333, ColNDVs map[7:1 8:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Limit_62{GID:2}, logic prop:{stats:{count 3, ColNDVs map[7:1 8:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Apply_58{GID:1, GID:3}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 10:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b,Column#10] Key: [] Unique key: []}}",
+          "GID:5, GE:Aggregation_59{GID:4}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}",
+          "GID:6, GE:Projection_60{GID:5}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b not in (select t2.b from t2 where t2.a = t1.a limit 3)) as cmp from t1) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_63{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_66{}, logic prop:{stats:{count 3.333333333333333, ColNDVs map[7:1 8:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Limit_74{GID:2}, logic prop:{stats:{count 3, ColNDVs map[7:1 8:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Apply_70{GID:1, GID:3}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 10:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b,Column#10] Key: [] Unique key: []}}",
+          "GID:5, GE:Aggregation_71{GID:4}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}",
+          "GID:6, GE:Projection_72{GID:5}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b in (select t2.b from t2 where t2.a = t1.a limit 3)) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_75{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_77{}, logic prop:{stats:{count 3.333333333333333, ColNDVs map[4:1 5:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Limit_86{GID:2}, logic prop:{stats:{count 3, ColNDVs map[4:1 5:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Apply_81{GID:1, GID:3}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:5, GE:Aggregation_83{GID:4}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:6, GE:Projection_84{GID:5}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b not in (select t2.b from t2 where t2.a = t1.a limit 3)) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_87{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_89{}, logic prop:{stats:{count 3.333333333333333, ColNDVs map[4:1 5:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Limit_98{GID:2}, logic prop:{stats:{count 3, ColNDVs map[4:1 5:1], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Apply_93{GID:1, GID:3}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:5, GE:Aggregation_95{GID:4}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:6, GE:Projection_96{GID:5}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1, t2 where t1.a = t2.a group by t1.a, t1.b",
+        "Str": [
+          "GID:1, GE:DataSource_99{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_100{}, logic prop:{stats:{count 10, ColNDVs map[4:3], GroupNDVs []}, schema:{Column: [test.t2.a] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_105{GID:1, GID:2}, logic prop:{stats:{count 16.666666666666668, ColNDVs map[1:2 2:2 4:3], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_103{GID:3}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_104{GID:4}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1 left join t2 on t1.a = t2.a group by t1.a, t1.b",
+        "Str": [
+          "GID:1, GE:DataSource_106{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_107{}, logic prop:{stats:{count 10, ColNDVs map[4:3], GroupNDVs []}, schema:{Column: [test.t2.a] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_111{GID:1, GID:2}, logic prop:{stats:{count 16.666666666666668, ColNDVs map[1:2 2:2 4:3], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_109{GID:3}, logic prop:{stats:{count 4, ColNDVs map[7:4], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_110{GID:4}, logic prop:{stats:{count 4, ColNDVs map[7:4], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1 left join t2 on t1.a = t2.a group by t2.a, t2.b",
+        "Str": [
+          "GID:1, GE:DataSource_112{}, logic prop:{stats:{count 5, ColNDVs map[1:2], GroupNDVs []}, schema:{Column: [test.t1.a] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_113{}, logic prop:{stats:{count 10, ColNDVs map[4:3 5:3], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_117{GID:1, GID:2}, logic prop:{stats:{count 16.666666666666668, ColNDVs map[1:2 4:3 5:3], GroupNDVs []}, schema:{Column: [test.t1.a,test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_115{GID:3}, logic prop:{stats:{count 3, ColNDVs map[7:3], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_116{GID:4}, logic prop:{stats:{count 3, ColNDVs map[7:3], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1 right join t2 on t1.a = t2.a group by t1.a, t1.b",
+        "Str": [
+          "GID:1, GE:DataSource_118{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_119{}, logic prop:{stats:{count 10, ColNDVs map[4:3], GroupNDVs []}, schema:{Column: [test.t2.a] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_123{GID:1, GID:2}, logic prop:{stats:{count 16.666666666666668, ColNDVs map[1:2 2:2 4:3], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_121{GID:3}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_122{GID:4}, logic prop:{stats:{count 2, ColNDVs map[7:2], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from t1 right join t2 on t1.a = t2.a group by t2.a, t2.b",
+        "Str": [
+          "GID:1, GE:DataSource_124{}, logic prop:{stats:{count 5, ColNDVs map[1:2], GroupNDVs []}, schema:{Column: [test.t1.a] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_125{}, logic prop:{stats:{count 10, ColNDVs map[4:3 5:3], GroupNDVs [{[4 5] 9}]}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_129{GID:1, GID:2}, logic prop:{stats:{count 16.666666666666668, ColNDVs map[1:2 4:3 5:3], GroupNDVs [{[4 5] 9}]}, schema:{Column: [test.t1.a,test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_127{GID:3}, logic prop:{stats:{count 9, ColNDVs map[7:9], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_128{GID:4}, logic prop:{stats:{count 9, ColNDVs map[7:9], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b in (select t2.b from t2 where t2.a > t1.a)) as cmp from t1) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_130{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_133{}, logic prop:{stats:{count 10, ColNDVs map[7:3 8:3], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_136{GID:1, GID:2}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 10:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b,Column#10] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_137{GID:3}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_138{GID:4}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(tmp.cmp) from (select t1.a as a, t1.b as b, (t1.b not in (select t2.b from t2 where t2.a > t1.a)) as cmp from t1) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_139{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_142{}, logic prop:{stats:{count 10, ColNDVs map[7:3 8:3], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_145{GID:1, GID:2}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 10:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b,Column#10] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_146{GID:3}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_147{GID:4}, logic prop:{stats:{count 4, ColNDVs map[11:4], GroupNDVs []}, schema:{Column: [Column#11] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b in (select t2.b from t2 where t2.a > t1.a)) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_148{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_150{}, logic prop:{stats:{count 10, ColNDVs map[4:3 5:3], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_153{GID:1, GID:2}, logic prop:{stats:{count 4, ColNDVs map[1:1.6 2:1.6], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_155{GID:3}, logic prop:{stats:{count 1.6, ColNDVs map[7:1.6], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_156{GID:4}, logic prop:{stats:{count 1.6, ColNDVs map[7:1.6], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from (select t1.a as a, t1.b as b from t1 where t1.b not in (select t2.b from t2 where t2.a > t1.a)) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_157{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_159{}, logic prop:{stats:{count 10, ColNDVs map[4:3 5:3], GroupNDVs []}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Join_162{GID:1, GID:2}, logic prop:{stats:{count 4, ColNDVs map[1:1.6 2:1.6], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:4, GE:Aggregation_164{GID:3}, logic prop:{stats:{count 1.6, ColNDVs map[7:1.6], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_165{GID:4}, logic prop:{stats:{count 1.6, ColNDVs map[7:1.6], GroupNDVs []}, schema:{Column: [Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select * from t1 left join (select t2.a as a, t2.b as b, count(1) as cnt from t2 group by t2.a, t2.b) as tmp on t1.a = tmp.a and t1.b = tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_166{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:DataSource_167{}, logic prop:{stats:{count 10, ColNDVs map[4:3 5:3], GroupNDVs [{[4 5] 9}]}, schema:{Column: [test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Aggregation_168{GID:2}, logic prop:{stats:{count 9, ColNDVs map[4:9 5:9 7:9], GroupNDVs [{[4 5] 9}]}, schema:{Column: [Column#7,test.t2.a,test.t2.b] Key: [[test.t2.a,test.t2.b]] Unique key: []}}",
+          "GID:4, GE:Join_172{GID:1, GID:3}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 4:5 5:5 7:5], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b,Column#7,test.t2.a,test.t2.b] Key: [] Unique key: []}}",
+          "GID:5, GE:Projection_171{GID:4}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 4:5 5:5 7:5], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b,test.t2.a,test.t2.b,Column#7] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(1) from (select t1.a as a, t1.b as b from t1 limit 3) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_173{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Limit_179{GID:1}, logic prop:{stats:{count 3, ColNDVs map[1:2 2:2], GroupNDVs []}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:3, GE:Aggregation_176{GID:2}, logic prop:{stats:{count 2, ColNDVs map[4:2], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}",
+          "GID:4, GE:Projection_177{GID:3}, logic prop:{stats:{count 2, ColNDVs map[4:2], GroupNDVs []}, schema:{Column: [Column#4] Key: [] Unique key: []}}"
+        ]
+      },
+      {
+        "SQL": "select count(tmp.a_sum) from (select t1.a as a, t1.b as b, sum(a) over() as a_sum from t1) tmp group by tmp.a, tmp.b",
+        "Str": [
+          "GID:1, GE:DataSource_180{}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b] Key: [] Unique key: []}}",
+          "GID:2, GE:Window_183{GID:1}, logic prop:{stats:{count 5, ColNDVs map[1:2 2:2 5:5], GroupNDVs [{[1 2] 4}]}, schema:{Column: [test.t1.a,test.t1.b,Column#5] Key: [] Unique key: []}}",
+          "GID:3, GE:Aggregation_185{GID:2}, logic prop:{stats:{count 4, ColNDVs map[6:4], GroupNDVs []}, schema:{Column: [Column#6] Key: [] Unique key: []}}",
+          "GID:4, GE:Projection_186{GID:3}, logic prop:{stats:{count 4, ColNDVs map[6:4], GroupNDVs []}, schema:{Column: [Column#6] Key: [] Unique key: []}}"
+        ]
+      }
+    ]
+  }
+]
diff --git a/pkg/planner/property/logical_property.go b/pkg/planner/property/logical_property.go
index e1e6156e156e0..64cc48485be0d 100644
--- a/pkg/planner/property/logical_property.go
+++ b/pkg/planner/property/logical_property.go
@@ -25,4 +25,9 @@ type LogicalProperty struct {
 	MaxOneRow bool
 }
 
+// NewLogicalProp returns a new empty LogicalProperty.
+func NewLogicalProp() *LogicalProperty {
+	return &LogicalProperty{}
+}
+
 // todo: ScalarProperty: usedColumns in current scalar expr, null reject, cor-related, subq contained and so on