Skip to content

Commit

Permalink
feat(streaming): handle multiple edges with no-op fragment (#9320)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Apr 23, 2023
1 parent 92b7c7b commit 2f62af4
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 4 deletions.
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ message DedupNode {
repeated uint32 dedup_column_indices = 2;
}

message NoOpNode {}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -581,6 +583,7 @@ message StreamNode {
BarrierRecvNode barrier_recv = 132;
ValuesNode values = 133;
DedupNode append_only_dedup = 134;
NoOpNode no_op = 135;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
108 changes: 108 additions & 0 deletions src/frontend/planner_test/tests/testdata/share.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,111 @@
└─StreamProject { exprs: [id, _row_id] }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] }
- id: self_join_multiple_edges
sql: |
create table t (a int, b int);
with cte as (select a, sum(b) sum from t group by a) select count(*) from cte c1 join cte c2 on c1.a = c2.a;
stream_plan: |
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [sum0(count)] }
└─StreamGlobalSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count] }
└─StreamHashJoin { type: Inner, predicate: t.a = t.a, output: all }
├─StreamShare { id = 4 }
| └─StreamProject { exprs: [t.a] }
| └─StreamHashAgg { group_key: [t.a], aggs: [count] }
| └─StreamExchange { dist: HashShard(t.a) }
| └─StreamTableScan { table: t, columns: [t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamShare { id = 4 }
└─StreamProject { exprs: [t.a] }
└─StreamHashAgg { group_key: [t.a], aggs: [count] }
└─StreamExchange { dist: HashShard(t.a) }
└─StreamTableScan { table: t, columns: [t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamProject { exprs: [sum0(count)] }
└── StreamGlobalSimpleAgg { aggs: [sum0(count), count] }
├── result table: 0
├── state tables: []
├── distinct tables: []
└── StreamExchange Single from 1
Fragment 1
StreamStatelessLocalSimpleAgg { aggs: [count] }
└── StreamHashJoin { type: Inner, predicate: t.a = t.a, output: all }
├── left table: 1
├── right table: 3
├── left degree table: 2
├── right degree table: 4
├── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([0]) from 4
Fragment 2
StreamProject { exprs: [t.a] }
└── StreamHashAgg { group_key: [t.a], aggs: [count] }
├── result table: 5
├── state tables: []
├── distinct tables: []
└── StreamExchange Hash([0]) from 3
Fragment 3
Chain { table: t, columns: [t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── Upstream
└── BatchPlanNode
Fragment 4
StreamNoOp
└── StreamExchange NoShuffle from 2
Table 0
├── columns: [ sum0(count), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0
Table 1
├── columns: [ t_a ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 2
├── columns: [ t_a, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 3
├── columns: [ t_a ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 4
├── columns: [ t_a, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 5
├── columns: [ t_a, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 4294967294
├── columns: [ count ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0
20 changes: 19 additions & 1 deletion src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ impl StreamFragmentGraph {
downstream_id: LocalFragmentId,
edge: StreamFragmentEdge,
) {
self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
}

/// Try to link upstream to downstream in the graph.
///
/// If the edge between upstream and downstream already exists, return an error.
pub fn try_add_edge(
&mut self,
upstream_id: LocalFragmentId,
downstream_id: LocalFragmentId,
edge: StreamFragmentEdge,
) -> Result<(), String> {
let edge = StreamFragmentEdgeProto {
upstream_id,
downstream_id,
Expand All @@ -140,6 +152,12 @@ impl StreamFragmentGraph {

self.edges
.try_insert((upstream_id, downstream_id), edge)
.unwrap();
.map(|_| ())
.map_err(|e| {
format!(
"edge between {} and {} already exists: {}",
upstream_id, downstream_id, e
)
})
}
}
71 changes: 68 additions & 3 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::error::Result;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::{
DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag,
DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode,
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
};

Expand Down Expand Up @@ -297,15 +297,80 @@ fn build_fragment(
// Exchange node should have only one input.
let [input]: [_; 1] = std::mem::take(&mut child_node.input).try_into().unwrap();
let child_fragment = build_and_add_fragment(state, input)?;
state.fragment_graph.add_edge(

let result = state.fragment_graph.try_add_edge(
child_fragment.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: exchange_node_strategy,
dispatch_strategy: exchange_node_strategy.clone(),
// Always use the exchange operator id as the link id.
link_id: child_node.operator_id,
},
);

// It's possible that there're multiple edges between two fragments, while the
// meta service and the compute node does not expect this. In this case, we
// manually insert a fragment of `NoOp` between the two fragments.
if result.is_err() {
// Assign a new operator id for the `Exchange`, so we can distinguish it
// from duplicate edges and break the sharing.
child_node.operator_id = state.gen_operator_id() as u64;

// Take the upstream plan node as the reference for properties of `NoOp`.
let ref_fragment_node = child_fragment.node.as_ref().unwrap();
let no_shuffle_strategy = DispatchStrategy {
r#type: DispatcherType::NoShuffle as i32,
dist_key_indices: vec![],
output_indices: (0..ref_fragment_node.fields.len() as u32).collect(),
};

let no_op_operator_id = state.gen_operator_id() as u64;
let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;

let no_op_fragment = {
let node = StreamNode {
operator_id: no_op_operator_id,
identity: "StreamNoOp".into(),
node_body: Some(NodeBody::NoOp(NoOpNode {})),
input: vec![StreamNode {
operator_id: no_shuffle_exchange_operator_id,
identity: "StreamNoShuffleExchange".into(),
node_body: Some(NodeBody::Exchange(ExchangeNode {
strategy: Some(no_shuffle_strategy.clone()),
})),
input: vec![],
..*ref_fragment_node.clone()
}],
..*ref_fragment_node.clone()
};

let mut fragment = state.new_stream_fragment();
fragment.node = Some(node.into());
Rc::new(fragment)
};

state.fragment_graph.add_fragment(no_op_fragment.clone());

state.fragment_graph.add_edge(
child_fragment.fragment_id,
no_op_fragment.fragment_id,
StreamFragmentEdge {
// Use `NoShuffle` exhcnage strategy for upstream edge.
dispatch_strategy: no_shuffle_strategy,
link_id: no_shuffle_exchange_operator_id,
},
);
state.fragment_graph.add_edge(
no_op_fragment.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
// Use the original exchange strategy for downstream edge.
dispatch_strategy: exchange_node_strategy,
link_id: child_node.operator_id,
},
);
}

Ok(child_node)
}

Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ mod lookup_union;
mod managed_state;
mod merge;
mod mview;
mod no_op;
mod now;
mod over_window;
mod project;
Expand Down Expand Up @@ -125,6 +126,7 @@ pub use lookup::*;
pub use lookup_union::LookupUnionExecutor;
pub use merge::MergeExecutor;
pub use mview::*;
pub use no_op::NoOpExecutor;
pub use now::NowExecutor;
pub use project::ProjectExecutor;
pub use project_set::*;
Expand Down
53 changes: 53 additions & 0 deletions src/stream/src/executor/no_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;

use super::{ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, PkIndicesRef};

/// No-op executor directly forwards the input stream. Currently used to break the multiple edges in
/// the fragment graph.
pub struct NoOpExecutor {
_ctx: ActorContextRef,
identity: String,
input: BoxedExecutor,
}

impl NoOpExecutor {
pub fn new(ctx: ActorContextRef, input: BoxedExecutor, executor_id: u64) -> Self {
Self {
_ctx: ctx,
identity: format!("BarrierRecvExecutor {:X}", executor_id),
input,
}
}
}

impl Executor for NoOpExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.input.execute()
}

fn schema(&self) -> &Schema {
self.input.schema()
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
self.input.pk_indices()
}

fn identity(&self) -> &str {
&self.identity
}
}
3 changes: 3 additions & 0 deletions src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod lookup;
mod lookup_union;
mod merge;
mod mview;
mod no_op;
mod now;
mod project;
mod project_set;
Expand Down Expand Up @@ -73,6 +74,7 @@ use self::lookup::*;
use self::lookup_union::*;
use self::merge::*;
use self::mview::*;
use self::no_op::*;
use self::now::NowExecutorBuilder;
use self::project::*;
use self::project_set::*;
Expand Down Expand Up @@ -161,5 +163,6 @@ pub async fn create_executor(
NodeBody::Values => ValuesExecutorBuilder,
NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
NodeBody::NoOp => NoOpExecutorBuilder,
}
}
38 changes: 38 additions & 0 deletions src/stream/src/from_proto/no_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::stream_plan::NoOpNode;
use risingwave_storage::StateStore;

use super::ExecutorBuilder;
use crate::error::StreamResult;
use crate::executor::{BoxedExecutor, Executor, NoOpExecutor};
use crate::task::{ExecutorParams, LocalStreamManagerCore};

pub struct NoOpExecutorBuilder;

#[async_trait::async_trait]
impl ExecutorBuilder for NoOpExecutorBuilder {
type Node = NoOpNode;

async fn new_boxed_executor(
params: ExecutorParams,
_node: &NoOpNode,
_store: impl StateStore,
_stream: &mut LocalStreamManagerCore,
) -> StreamResult<BoxedExecutor> {
let [input]: [_; 1] = params.input.try_into().unwrap();
Ok(NoOpExecutor::new(params.actor_context, input, params.executor_id).boxed())
}
}

0 comments on commit 2f62af4

Please sign in to comment.