Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plan_node): simplify union nodes #10198

Merged
merged 1 commit into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions src/frontend/src/optimizer/plan_node/batch_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::UnionNode;

use super::{ExprRewritable, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::optimizer::plan_node::{LogicalUnion, PlanBase, PlanTreeNode, ToLocalBatch};
use super::{generic, ExprRewritable, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchUnion` implements [`super::LogicalUnion`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchUnion {
pub base: PlanBase,
logical: LogicalUnion,
logical: generic::Union<PlanRef>,
}

impl BatchUnion {
pub fn new(logical: LogicalUnion) -> Self {
let ctx = logical.base.ctx.clone();

pub fn new(logical: generic::Union<PlanRef>) -> Self {
let dist = if logical
.inputs()
.inputs
.iter()
.all(|input| *input.distribution() == Distribution::Single)
{
Expand All @@ -43,7 +41,7 @@ impl BatchUnion {
Distribution::SomeShard
};

let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any());
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
BatchUnion { base, logical }
}
}
Expand All @@ -56,14 +54,14 @@ impl fmt::Display for BatchUnion {

impl PlanTreeNode for BatchUnion {
fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
let mut vec = smallvec::SmallVec::new();
vec.extend(self.logical.inputs().into_iter());
vec
smallvec::SmallVec::from_vec(self.logical.inputs.clone())
}

fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
// For batch query, we don't need to clone `source_col`, so just use new.
Self::new(LogicalUnion::new(self.logical.all(), inputs.to_owned())).into()
let mut new = self.logical.clone();
new.inputs = inputs.to_vec();
Self::new(new).into()
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ impl ColPrunable for LogicalAgg {
let group_key_required_cols = self.group_key().clone();

let (agg_call_required_cols, agg_calls) = {
let input_cnt = self.input().schema().fields().len();
let input_cnt = self.input().schema().len();
let mut tmp = FixedBitSet::with_capacity(input_cnt);
let group_key_cardinality = self.group_key().count_ones(..);
let new_agg_calls = required_cols
Expand Down
19 changes: 15 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,16 @@ impl PredicatePushdown for LogicalUnion {

impl ToBatch for LogicalUnion {
fn to_batch(&self) -> Result<PlanRef> {
let new_inputs: Result<Vec<_>> =
self.inputs().iter().map(|input| input.to_batch()).collect();
let new_logical = Self::new(true, new_inputs?);
let new_inputs = self
.inputs()
.iter()
.map(|input| input.to_batch())
.try_collect()?;
let new_logical = generic::Union {
all: true,
inputs: new_inputs,
source_col: None,
};
// We still need to handle !all even if we already have `UnionToDistinctRule`, because it
// can be generated by index selection which is an optimization during the `to_batch`.
// Convert union to union all + agg
Expand All @@ -152,7 +159,11 @@ impl ToStream for LogicalUnion {
.iter()
.map(|input| input.to_stream_with_dist_required(&dist, ctx))
.collect();
let new_logical = Self::new_with_source_col(true, new_inputs?, self.core.source_col);
let new_logical = generic::Union {
all: true,
inputs: new_inputs?,
..self.core
};
assert!(
self.all(),
"After UnionToDistinctRule, union should become union all"
Expand Down
45 changes: 16 additions & 29 deletions src/frontend/src/optimizer/plan_node/stream_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,38 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::UnionNode;

use super::utils::formatter_debug_plan_node;
use super::{ExprRewritable, PlanRef};
use super::{generic, ExprRewritable, PlanRef};
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::{LogicalUnion, PlanBase, PlanTreeNode, StreamNode};
use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `StreamUnion` implements [`super::LogicalUnion`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamUnion {
pub base: PlanBase,
logical: LogicalUnion,
logical: generic::Union<PlanRef>,
}

impl StreamUnion {
pub fn new(logical: LogicalUnion) -> Self {
let ctx = logical.base.ctx.clone();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();
let inputs = logical.inputs();
pub fn new(logical: generic::Union<PlanRef>) -> Self {
let inputs = &logical.inputs;
let dist = inputs[0].distribution().clone();
assert!(logical
.inputs()
.iter()
.all(|input| *input.distribution() == dist));
assert!(inputs.iter().all(|input| *input.distribution() == dist));
let watermark_columns = inputs.iter().fold(
{
let mut bitset = FixedBitSet::with_capacity(schema.len());
let mut bitset = FixedBitSet::with_capacity(logical.schema().len());
bitset.toggle_range(..);
bitset
},
|acc_watermark_columns, input| acc_watermark_columns.bitand(input.watermark_columns()),
);

let base = PlanBase::new_stream(
ctx,
schema,
pk_indices,
logical.functional_dependency().clone(),
let base = PlanBase::new_stream_with_logical(
&logical,
dist,
logical.inputs().iter().all(|x| x.append_only()),
logical.inputs().iter().all(|x| x.emit_on_window_close()),
inputs.iter().all(|x| x.append_only()),
inputs.iter().all(|x| x.emit_on_window_close()),
watermark_columns,
);
StreamUnion { base, logical }
Expand Down Expand Up @@ -91,18 +83,13 @@ impl fmt::Display for StreamUnion {

impl PlanTreeNode for StreamUnion {
fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
let mut vec = smallvec::SmallVec::new();
vec.extend(self.logical.inputs().into_iter());
vec
smallvec::SmallVec::from_vec(self.logical.inputs.clone())
}

fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
Self::new(LogicalUnion::new_with_source_col(
self.logical.all(),
inputs.to_owned(),
self.logical.source_col(),
))
.into()
let mut new = self.logical.clone();
new.inputs = inputs.to_vec();
Self::new(new).into()
}
}

Expand Down