From aa5db4ea35486fbe32d7ead636f7f80bf4e258ec Mon Sep 17 00:00:00 2001
From: Hauke Strasdat <strasdat@gmail.com>
Date: Sun, 28 Jan 2024 16:04:24 -0800
Subject: [PATCH] feat: zip & nudge actor, proc-macro improvements

---
 Cargo.toml                                 |  15 +-
 examples/moving_average.rs                 |   4 +-
 examples/nudge.rs                          |  34 ++
 examples/one_dim_robot.rs                  |  57 ++-
 examples/print_ticks.rs                    |  10 +-
 examples/zip.rs                            |  55 ++
 hollywood_macros/Cargo.toml                |  12 +-
 hollywood_macros/src/actors.rs             |   1 +
 hollywood_macros/src/actors/zip.rs         | 563 +++++++++++++++++++++
 hollywood_macros/src/core.rs               | 492 ++++++++++++++++++
 hollywood_macros/src/lib.rs                | 388 +++-----------
 src/actors.rs                              |  22 +
 src/actors/nudge.rs                        | 153 ++++++
 src/actors/periodic.rs                     |   4 +-
 src/actors/printer.rs                      |  60 +--
 src/actors/zip.rs                          |  62 +++
 src/compute.rs                             |   6 +-
 src/compute/context.rs                     |  16 +-
 src/compute/pipeline.rs                    |   4 +-
 src/compute/topology.rs                    |   7 +-
 src/core.rs                                |   7 +-
 src/core/actor.rs                          |   2 +-
 src/core/actor_builder.rs                  |  11 +-
 src/core/connection/outbound_connection.rs |   7 +-
 src/core/connection/request_connection.rs  |   4 +-
 src/core/outbound.rs                       | 102 +++-
 src/core/request.rs                        |  28 +-
 src/core/runner.rs                         |   2 -
 src/core/value.rs                          |   2 -
 src/example_actors.rs                      |   4 +-
 src/example_actors/moving_average.rs       |   4 +-
 src/example_actors/one_dim_robot.rs        |   4 +-
 src/example_actors/one_dim_robot/draw.rs   | 212 +++-----
 src/example_actors/one_dim_robot/filter.rs |  90 +++-
 src/example_actors/one_dim_robot/model.rs  |   5 +-
 src/example_actors/one_dim_robot/sim.rs    |  78 +--
 src/introspect.rs                          |   1 -
 src/introspect/flow_graph.rs               |   5 +-
 src/lib.rs                                 |  60 ++-
 39 files changed, 1880 insertions(+), 713 deletions(-)
 create mode 100644 examples/nudge.rs
 create mode 100644 examples/zip.rs
 create mode 100644 hollywood_macros/src/actors.rs
 create mode 100644 hollywood_macros/src/actors/zip.rs
 create mode 100644 hollywood_macros/src/core.rs
 create mode 100644 src/actors/nudge.rs
 create mode 100644 src/actors/zip.rs

diff --git a/Cargo.toml b/Cargo.toml
index 0abd498..e09bad0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,14 +4,14 @@ name = "hollywood"
 description = "hollywood actor framework"
 edition = "2021"
 include = [
-    "**/*.rs",
-    "Cargo.toml",
+  "**/*.rs",
+  "Cargo.toml",
 ]
-license = "Apache-2.0"
 keywords = ["actor", "compute", "graph", "pipeline"]
+license = "Apache-2.0"
 readme = "README.md"
 repository = "https://github.com/farm-ng/hollywood/"
-version = "0.3.0"
+version = "0.4.0"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
@@ -19,10 +19,13 @@ version = "0.3.0"
 async-trait = "0.1.51"
 drawille = "0.3.0"
 grid = "0.13.0"
-hollywood_macros = { version = "0.3.0", path = "hollywood_macros" }
+hollywood_macros = {version = "0.4.0", path = "hollywood_macros"}
+# hollywood intends to use only very basic features of nalgebra, hence 
+# future versions of nalgebra before the major < 1.0 release are likely to work
+nalgebra = ">= 0.32, <1.0"
 petgraph = "0.6.3"
 rand = "0.8.4"
 rand_distr = "0.4.3"
 # executor feature needed
-tokio = { version = "1.28.0", features = ["full"] }
+tokio = {version = "1.28.0", features = ["full"]}
 tokio-stream = "0.1.14"
diff --git a/examples/moving_average.rs b/examples/moving_average.rs
index bccd965..e614b5b 100644
--- a/examples/moving_average.rs
+++ b/examples/moving_average.rs
@@ -3,7 +3,9 @@ use hollywood::actors::{Periodic, Printer};
 use hollywood::compute::Context;
 use hollywood::core::{FromPropState, NullState};
 
-use hollywood::example_actors::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState};
+use hollywood::example_actors::moving_average::{
+    MovingAverage, MovingAverageProp, MovingAverageState,
+};
 
 ///
 pub async fn run_moving_average_example() {
diff --git a/examples/nudge.rs b/examples/nudge.rs
new file mode 100644
index 0000000..0d1eb83
--- /dev/null
+++ b/examples/nudge.rs
@@ -0,0 +1,34 @@
+use hollywood::actors::printer::PrinterProp;
+use hollywood::actors::{Nudge, Printer};
+use hollywood::compute::Context;
+use hollywood::core::*;
+
+pub async fn run_tick_print_example() {
+    let pipeline = Context::configure(&mut |context| {
+        let mut nudge = Nudge::<String>::new(context, "nudge".to_owned());
+        let mut nudge_printer = Printer::<String>::from_prop_and_state(
+            context,
+            PrinterProp {
+                topic: "nudge: ".to_string(),
+            },
+            NullState::default(),
+        );
+        nudge
+            .outbound
+            .nudge
+            .connect(context, &mut nudge_printer.inbound.printable);
+    });
+
+    pipeline.print_flow_graph();
+    pipeline.run().await;
+}
+
+fn main() {
+    tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .build()
+        .unwrap()
+        .block_on(async {
+            run_tick_print_example().await;
+        })
+}
diff --git a/examples/one_dim_robot.rs b/examples/one_dim_robot.rs
index 46aeaa6..68fed1a 100644
--- a/examples/one_dim_robot.rs
+++ b/examples/one_dim_robot.rs
@@ -1,9 +1,11 @@
 use hollywood::actors::printer::PrinterProp;
+use hollywood::actors::zip::ZipPair;
 use hollywood::actors::Periodic;
 use hollywood::actors::Printer;
+use hollywood::actors::Zip3;
 use hollywood::compute::Context;
 use hollywood::core::*;
-use hollywood::example_actors::one_dim_robot::draw::DrawState;
+
 use hollywood::example_actors::one_dim_robot::filter::FilterState;
 use hollywood::example_actors::one_dim_robot::{
     DrawActor, Filter, NamedFilterState, Robot, Sim, SimState, Stamped,
@@ -11,13 +13,14 @@ use hollywood::example_actors::one_dim_robot::{
 
 async fn run_robot_example() {
     let pipeline = Context::configure(&mut |context| {
-        let mut timer = Periodic::new_with_period(context, 0.25);
+        let mut timer = Periodic::new_with_period(context, 0.1);
         let mut sim = Sim::from_prop_and_state(
             context,
             NullProp {},
             SimState {
-                shutdown_time: 10.0,
+                shutdown_time: 15.0,
                 time: 0.0,
+                seq: 0,
                 true_robot: Robot {
                     position: -2.0,
                     velocity: 0.4,
@@ -40,8 +43,9 @@ async fn run_robot_example() {
             NullState::default(),
         );
 
-        let mut draw_actor =
-            DrawActor::from_prop_and_state(context, NullProp {}, DrawState::default());
+        let mut zip = Zip3::new_default_init_state(context, NullProp {});
+
+        let mut draw = DrawActor::new_default_init_state(context, NullProp {});
 
         timer
             .outbound
@@ -54,28 +58,47 @@ async fn run_robot_example() {
         sim.outbound
             .noisy_range
             .connect(context, &mut filter.inbound.noisy_range);
-        sim.outbound
-            .true_robot
-            .connect(context, &mut draw_actor.inbound.true_pos);
-        sim.outbound
-            .true_range
-            .connect(context, &mut draw_actor.inbound.true_range);
+        sim.outbound.true_robot.connect_with_adapter(
+            context,
+            |x| ZipPair {
+                key: x.seq,
+                value: x,
+            },
+            &mut zip.inbound.item0,
+        );
+        sim.outbound.true_range.connect_with_adapter(
+            context,
+            |x| ZipPair {
+                key: x.seq,
+                value: x,
+            },
+            &mut zip.inbound.item1,
+        );
         sim.outbound
             .true_robot
             .connect(context, &mut truth_printer.inbound.printable);
 
-
-        sim.request.ping_pong.connect(context, &mut filter.inbound.ping_pong_request);
+        sim.request
+            .ping_pong
+            .connect(context, &mut filter.inbound.ping_pong_request);
         context.register_cancel_requester(&mut sim.outbound.cancel_request);
 
         filter
             .outbound
             .updated_state
             .connect(context, &mut filter_state_printer.inbound.printable);
-        filter
-            .outbound
-            .updated_state
-            .connect(context, &mut draw_actor.inbound.filter_est);
+        filter.outbound.updated_state.connect_with_adapter(
+            context,
+            |x| ZipPair {
+                key: x.state.seq,
+                value: x,
+            },
+            &mut zip.inbound.item2,
+        );
+
+        zip.outbound
+            .zipped
+            .connect(context, &mut draw.inbound.zipped);
     });
 
     pipeline.print_flow_graph();
diff --git a/examples/print_ticks.rs b/examples/print_ticks.rs
index 02f2164..3a2a62f 100644
--- a/examples/print_ticks.rs
+++ b/examples/print_ticks.rs
@@ -14,11 +14,11 @@ pub async fn run_tick_print_example() {
             },
             NullState::default(),
         );
-        timer
-            .outbound
-            .time_stamp
-            .connect(context, &mut time_printer.inbound.printable);
-
+        timer.outbound.time_stamp.connect_with_adapter(
+            context,
+            |t| 10.0 * t,
+            &mut time_printer.inbound.printable,
+        );
     });
 
     pipeline.print_flow_graph();
diff --git a/examples/zip.rs b/examples/zip.rs
new file mode 100644
index 0000000..a827b8b
--- /dev/null
+++ b/examples/zip.rs
@@ -0,0 +1,55 @@
+use hollywood::actors::printer::PrinterProp;
+use hollywood::actors::zip::{Tuple2, ZipPair};
+use hollywood::actors::{periodic, Printer, Zip2};
+use hollywood::compute::Context;
+use hollywood::core::*;
+
+pub async fn run_tick_print_example() {
+    let pipeline = Context::configure(&mut |context| {
+        let mut periodic = periodic::Periodic::new_with_period(context, 1.0);
+
+        let mut zip =
+            Zip2::<u64, String, String>::new_default_init_state(context, NullProp::default());
+        let mut printer = Printer::<Tuple2<u64, String, String>>::from_prop_and_state(
+            context,
+            PrinterProp {
+                topic: "zipped".to_string(),
+            },
+            NullState::default(),
+        );
+
+        periodic.outbound.time_stamp.connect_with_adapter(
+            context,
+            |t| ZipPair {
+                key: t as u64,
+                value: "hello".to_string(),
+            },
+            &mut zip.inbound.item0,
+        );
+        periodic.outbound.time_stamp.connect_with_adapter(
+            context,
+            |t| ZipPair {
+                key: 2 * t as u64,
+                value: "world".to_string(),
+            },
+            &mut zip.inbound.item1,
+        );
+
+        zip.outbound
+            .zipped
+            .connect(context, &mut printer.inbound.printable);
+    });
+
+    pipeline.print_flow_graph();
+    pipeline.run().await;
+}
+
+fn main() {
+    tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .build()
+        .unwrap()
+        .block_on(async {
+            run_tick_print_example().await;
+        })
+}
diff --git a/hollywood_macros/Cargo.toml b/hollywood_macros/Cargo.toml
index dd76754..a8d0f68 100644
--- a/hollywood_macros/Cargo.toml
+++ b/hollywood_macros/Cargo.toml
@@ -4,14 +4,14 @@ name = "hollywood_macros"
 description = "Macros for the Hollywood actor framework"
 edition = "2021"
 include = [
-    "**/*.rs",
-    "Cargo.toml",
+  "**/*.rs",
+  "Cargo.toml",
 ]
-license = "Apache-2.0"
 keywords = ["actor", "compute", "graph", "pipeline"]
+license = "Apache-2.0"
 readme = "../README.md"
 repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros"
-version = "0.3.0"
+version = "0.4.0"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 [lib]
@@ -19,6 +19,6 @@ proc-macro = true
 
 [dependencies]
 convert_case = "0.6.0"
+proc-macro2 = "1.0"
 quote = "1.0.9"
-syn = { version = "2.0.18", features = ["full"] }
-
+syn = {version = "2.0.18", features = ["full"]}
diff --git a/hollywood_macros/src/actors.rs b/hollywood_macros/src/actors.rs
new file mode 100644
index 0000000..d8390f9
--- /dev/null
+++ b/hollywood_macros/src/actors.rs
@@ -0,0 +1 @@
+pub mod zip;
\ No newline at end of file
diff --git a/hollywood_macros/src/actors/zip.rs b/hollywood_macros/src/actors/zip.rs
new file mode 100644
index 0000000..e46a423
--- /dev/null
+++ b/hollywood_macros/src/actors/zip.rs
@@ -0,0 +1,563 @@
+use proc_macro2::TokenStream;
+use quote::{format_ident, quote};
+use syn::{parse::Parse, parse::ParseStream, parse2, LitInt, Result};
+
+struct ZipInput {
+    num_fields: usize,
+}
+
+impl Parse for ZipInput {
+    fn parse(input: ParseStream) -> Result<Self> {
+        let num_fields: LitInt = input.parse()?;
+        Ok(ZipInput {
+            num_fields: num_fields.base10_parse()?,
+        })
+    }
+}
+
+pub(crate) fn tuple_n_impl(input: TokenStream) -> TokenStream {
+    let ZipInput { num_fields } = match parse2(input) {
+        Ok(input) => input,
+        Err(err) => return TokenStream::from(err.to_compile_error()),
+    };
+
+    let tuple_struct = format_ident!("Tuple{}", num_fields);
+
+    let field_seq = (0..num_fields).map(|i| format_ident!("item{}", i));
+    let field_seq2 = field_seq.clone();
+    let field_seq3 = field_seq.clone();
+
+    let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+    let type_seq2 = type_seq.clone();
+    let type_seq3 = type_seq.clone();
+    let type_with_bounds_seq = (0..num_fields).map(|i| {
+        let ident = format_ident!("Item{}", i);
+        quote! { #ident: Default + Clone + Debug + Sync + Send + 'static
+        + std::marker::Sync+ std::marker::Send}
+    });
+
+    let expanded = quote! {
+        #[derive(Default, Clone, Debug)]
+
+        /// A tuple struct X fields.
+        ///
+        /// Used to send merged items from X inbound channels to one outbound channel.
+        pub struct #tuple_struct<Key:Default+Clone+Debug, #( #type_seq ),*> {
+            /// Key to associate message from different inbound channels with.
+            pub key: Key,
+            #(
+                /// The value to be zipped.
+                pub #field_seq: #type_seq3
+            ),*
+        }
+        impl<
+                Key: Default
+                    + Debug
+                    + Clone
+                    + Display
+                    + PartialEq
+                    + Eq
+                    + PartialOrd
+                    + Ord
+                    + Sync
+                    + Send
+                    + 'static,
+                    #( #type_with_bounds_seq ),*
+            > Display for #tuple_struct<Key, #( #type_seq2 ),*>
+        {
+            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                write!(
+                    f,
+                    concat!( "key: {}", #( stringify!(, #field_seq2), ": {:?}" ),* ),
+                    self.key,  #( self.#field_seq3), *)
+            }
+        }
+    };
+
+    TokenStream::from(expanded)
+}
+
+pub(crate) fn zip_outbound_n_impl(input: TokenStream) -> TokenStream {
+    let ZipInput { num_fields } = match parse2(input) {
+        Ok(input) => input,
+        Err(err) => return TokenStream::from(err.to_compile_error()),
+    };
+
+    let outbound_struct = format_ident!("Zip{}Outbound", num_fields);
+    let tuple_struct = format_ident!("Tuple{}", num_fields);
+
+    let params_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+    let params_seq2 = params_seq.clone();
+    let params_seq3 = params_seq.clone();
+    let params_seq4 = params_seq.clone();
+    let params_seq5 = params_seq.clone();
+
+    let params_with_bounds_seq: Vec<_> = (0..num_fields)
+        .map(|i| {
+            let ident = format_ident!("Item{}", i);
+            quote! { #ident: Default + Clone + Debug + Sync + Send
+            + 'static +std::marker::Sync+ std::marker::Send}
+        })
+        .collect();
+    let params_with_bounds_seq2 = params_with_bounds_seq.clone();
+    let params_with_bounds_seq3 = params_with_bounds_seq.clone();
+
+    let expanded = quote! {
+        /// ZipX outbound hub
+        ///
+        /// Contains one outbound channel of the merged inbound channels.
+        pub struct #outbound_struct<Key:Default + Debug + Clone + Sync + Send
+                                + 'static , #( #params_with_bounds_seq ),*> {
+            /// Outbound channel of the merged inbound channels.
+            pub zipped: OutboundChannel<#tuple_struct<Key, #( #params_seq2 ),*>>,
+        }
+
+        impl<Key: Default +  Debug+Clone+Sync + Send + 'static, #( #params_with_bounds_seq2 ),*>
+            Activate for #outbound_struct<Key, #( #params_seq3 ),*>
+        {
+            fn extract(&mut self) -> Self {
+                Self {
+                    zipped: self.zipped.extract(),
+                }
+            }
+
+            fn activate(&mut self) {
+                self.zipped.activate();
+            }
+        }
+
+        impl<Key:  Default + Debug+Clone+Sync + Send + 'static, #( #params_with_bounds_seq3 ),*>
+            OutboundHub for #outbound_struct<Key, #( #params_seq4 ),*>
+        {
+            fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
+                Self {
+                    zipped: OutboundChannel::<#tuple_struct<Key, #( #params_seq5 ),*>>::new(
+                        context,
+                        "zipped".to_owned(),
+                        actor_name,
+                    ),
+                }
+            }
+        }
+    };
+
+    TokenStream::from(expanded)
+}
+
+pub(crate) fn zip_state_n_impl(input: TokenStream) -> TokenStream {
+    let ZipInput { num_fields } = match parse2(input) {
+        Ok(input) => input,
+        Err(err) => return TokenStream::from(err.to_compile_error()),
+    };
+
+    let state_struct = format_ident!("Zip{}State", num_fields);
+
+    let heap_item_seq = (0..num_fields).map(|i| {
+        let heap_item = format_ident!("item{}_heap", i);
+        let item = format_ident!("Item{}", i); // Corrected this line
+        let pair = quote! { ZipPair<#i, Key, #item> };
+        quote! { 
+            /// Heap for the Xth inbound channel.
+            pub #heap_item: std::collections::BinaryHeap<std::cmp::Reverse<#pair>> 
+        }
+    });
+
+    let item_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+
+    let expanded = quote! {
+
+        /// State of the zip actor with X inbound channels.
+        #[derive(Clone, Debug, Default)]
+        pub struct #state_struct<Key: PartialEq + Eq + PartialOrd + Ord,
+                                #( #item_seq: Default + Clone + Debug + Sync + Send + 'static ),*>
+        {
+            #( #heap_item_seq ),*
+        }
+    };
+
+    TokenStream::from(expanded)
+}
+
+pub(crate) fn zip_inbound_message_n_impl(input: TokenStream) -> TokenStream {
+    let ZipInput { num_fields } = match parse2(input) {
+        Ok(input) => input,
+        Err(err) => return TokenStream::from(err.to_compile_error()),
+    };
+
+    let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields);
+    let state_struct = format_ident!("Zip{}State", num_fields);
+    let outbound_struct = format_ident!("Zip{}Outbound", num_fields);
+
+    let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+    let type_seq2 = type_seq.clone();
+    let type_seq3 = type_seq.clone();
+    let type_seq4 = type_seq.clone();
+    let type_seq5 = type_seq.clone();
+    let type_seq6 = type_seq.clone();
+    let type_with_bounds_seq: Vec<_> = (0..num_fields)
+        .map(|i| {
+            let ident = format_ident!("Item{}", i);
+            quote! { #ident: Default + Clone + Debug + Sync + Send
+            + 'static}
+        })
+        .collect();
+
+    let i_seq: Vec<_> = (0..num_fields)
+        .map(|i| {
+            quote! { #i }
+        })
+        .collect();
+
+    let msg_new_impl_seq = (0..num_fields).map(|i| {
+        let item = format_ident!("Item{}", i);
+        let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+
+        quote! {
+                   impl<
+                   Key: Default
+                       + Clone
+                       + Debug
+                       + PartialEq
+                       + Eq
+                       + PartialOrd
+                       + Ord
+                       + Debug
+                       + Clone
+                       + Sync
+                       + Send
+                       + 'static,
+                       #( #type_with_bounds_seq ),*
+                       > InboundMessageNew<ZipPair<#i, Key, #item>>
+                       for #inbound_message_enum<Key, #( #type_seq ),*>
+                   {
+                       fn new(_inbound_name: String, msg: ZipPair<#i, Key, #item>) -> Self {
+                           #inbound_message_enum::#item(msg)
+                       }
+                   }
+        }
+    });
+
+    let expand = quote! {
+
+        /// Inbound message for the zip actor.
+        #[derive(Clone,Debug)]
+        pub enum #inbound_message_enum<
+            Key: Ord + Clone + Debug + Sync + Send + 'static,
+            #( #type_with_bounds_seq ),*
+        > {
+            #(  
+                /// Inbound message for the Xth inbound channel.
+                #type_seq(ZipPair<#i_seq, Key, #type_seq>)
+            ),*
+        }
+
+       #(#msg_new_impl_seq)*
+
+       impl<
+                Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord
+                + Sync + Send + 'static,
+                #( #type_with_bounds_seq ),*
+            > InboundMessage for  #inbound_message_enum<Key, #(#type_seq2),*>
+        {
+            type Prop = NullProp;
+            type State = #state_struct<Key, #(#type_seq3),*>;
+            type OutboundHub = #outbound_struct<Key, #(#type_seq4),*>;
+            type RequestHub = NullRequest;
+
+            fn inbound_channel(&self) -> String {
+                match self {
+                    #( #inbound_message_enum::#type_seq5(_) =>  stringify!(#type_seq6).to_owned(), )*
+                }
+            }
+        }
+    };
+
+    TokenStream::from(expand)
+}
+
+pub(crate) fn zip_n_impl(input: TokenStream) -> TokenStream {
+    let ZipInput { num_fields } = match parse2(input) {
+        Ok(input) => input,
+        Err(err) => return TokenStream::from(err.to_compile_error()),
+    };
+
+    let zip_struct = format_ident!("Zip{}", num_fields);
+    let state_struct = format_ident!("Zip{}State", num_fields);
+    let inbound_struct = format_ident!("Zip{}Inbound", num_fields);
+    let outbound_struct = format_ident!("Zip{}Outbound", num_fields);
+    let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields);
+
+    let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+    let type_seq2 = type_seq.clone();
+    let type_seq3 = type_seq.clone();
+    let type_seq4 = type_seq.clone();
+    let type_seq5 = type_seq.clone();
+    let type_seq6 = type_seq.clone();
+    let type_seq7 = type_seq.clone();
+    let type_seq8 = type_seq.clone();
+    let type_seq9 = type_seq.clone();
+    let type_seq10 = type_seq.clone();
+    let type_seq11 = type_seq.clone();
+    let type_seq12 = type_seq.clone();
+
+    let type_with_bounds_seq: Vec<_> = (0..num_fields)
+        .map(|i| {
+            let item_type = format_ident!("Item{}", i);
+            quote! { #item_type: Default + Clone + Debug + Sync + Send
+            + 'static}
+        })
+        .collect();
+
+    let expanded = quote! {
+
+        /// ZipX actor, which zips X inbound channels into one outbound channel.
+        pub type #zip_struct<Key, #( #type_seq), *> = Actor<
+                NullProp,
+                #inbound_struct<Key, #( #type_seq2), *>,
+                #state_struct<Key, #( #type_seq3), *>,
+                #outbound_struct<Key, #( #type_seq4), *>,
+                NullRequest,
+            >;
+
+        impl<
+                Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord
+                     + Sync + Send + 'static,
+                #( #type_with_bounds_seq ),*
+            >
+            FromPropState<
+                NullProp,
+                #inbound_struct<Key, #( #type_seq5), *>,
+                #state_struct<Key, #( #type_seq6), *>,
+                #outbound_struct<Key, #( #type_seq7), *>,
+                #inbound_message_enum<Key, #( #type_seq8), *>,
+                NullRequest,
+                DefaultRunner<
+                    NullProp,
+                    #inbound_struct<Key, #( #type_seq9), *>,
+                    #state_struct<Key, #( #type_seq10), *>,
+                    #outbound_struct<Key, #( #type_seq11), *>,
+                    NullRequest,
+                >,
+            > for #zip_struct<Key, #( #type_seq12), *>
+        {
+            fn name_hint(_prop: &NullProp) -> String {
+                stringify!(#zip_struct).to_owned()
+            }
+        }
+
+    };
+
+    TokenStream::from(expanded)
+}
+
+pub(crate) fn zip_inbound_n_impl(input: TokenStream) -> TokenStream {
+    let ZipInput { num_fields } = match parse2(input) {
+        Ok(input) => input,
+        Err(err) => return TokenStream::from(err.to_compile_error()),
+    };
+
+    let inbound_struct = format_ident!("Zip{}Inbound", num_fields);
+    let state_struct = format_ident!("Zip{}State", num_fields);
+    let outbound_struct = format_ident!("Zip{}Outbound", num_fields);
+    let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields);
+
+    let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+    let type_seq4 = type_seq.clone();
+    let type_seq5 = type_seq.clone();
+    let type_seq6 = type_seq.clone();
+    let type_seq7 = type_seq.clone();
+    let type_seq8 = type_seq.clone();
+    let type_seq9 = type_seq.clone();
+    let type_seq10 = type_seq.clone();
+    let type_seq12 = type_seq.clone();
+
+    let item_seq = (0..num_fields).map(|i| format_ident!("item{}", i));
+    let item_seq2 = item_seq.clone();
+    let item_seq3 = item_seq.clone();
+    let item_seq4 = item_seq.clone();
+    let item_seq5 = item_seq.clone();
+
+    let type_with_bounds_seq: Vec<_> = (0..num_fields)
+        .map(|i| {
+            let ident = format_ident!("Item{}", i);
+            quote! { #ident: Default + Clone + Debug + Sync + Send
+            + 'static}
+        })
+        .collect();
+
+    let channel: Vec<_> = (0..num_fields)
+        .map(|i| {
+            let item_type = format_ident!("Item{}", i);
+            let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+
+            quote! {
+                InboundChannel<ZipPair<#i, Key, #item_type>,
+                              #inbound_message_enum<Key, #( #type_seq),*>>
+            }
+        })
+        .collect();
+
+    let expanded = quote! {
+
+        /// Inbound hub for the zip actor.
+        #[derive(Clone,Debug)]
+        pub struct #inbound_struct<
+            Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord
+                 + Sync + Send + 'static,
+            #( #type_with_bounds_seq ),*
+        > {
+            #( 
+                /// Inbound channel for the Xth inbound channel.
+                pub #item_seq5: #channel 
+            ),*
+        }
+
+        impl<
+                Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord
+                     + Sync + Send + 'static,
+                #( #type_with_bounds_seq ),*
+            >
+            InboundHub<
+                NullProp,
+                #state_struct<Key, #( #type_seq4),*>,
+                #outbound_struct<Key, #( #type_seq5),*>,
+                NullRequest,
+                #inbound_message_enum<Key, #( #type_seq6),*>,
+            > for #inbound_struct<Key, #( #type_seq7),*>
+        {
+            fn from_builder(
+                builder: &mut crate::core::ActorBuilder<
+                    NullProp,
+                    #state_struct<Key, #( #type_seq8),*>,
+                    #outbound_struct<Key, #( #type_seq9),*>,
+                    NullRequest,
+                    #inbound_message_enum<Key, #( #type_seq10),*>,
+                >,
+                actor_name: &str,
+            ) -> Self {
+                #(
+                let #item_seq = InboundChannel::new(
+                    builder.context,
+                    actor_name,
+                    &builder.sender,
+                    stringify!(#type_seq12).to_owned(),
+                );
+                builder
+                    .forward
+                    .insert(#item_seq2.name.clone(), Box::new(#item_seq3.clone()));
+                )*
+
+                Self { #( #item_seq4 ),* }
+            }
+        }
+    };
+
+    TokenStream::from(expanded)
+}
+
+pub(crate) fn zip_onmessage_n_impl(input: TokenStream) -> TokenStream {
+    let ZipInput { num_fields } = match parse2(input) {
+        Ok(input) => input,
+        Err(err) => return TokenStream::from(err.to_compile_error()),
+    };
+
+    let tuple_struct = format_ident!("Tuple{}", num_fields);
+    let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields);
+
+    let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i));
+
+    let type_with_bounds_seq: Vec<_> = (0..num_fields)
+        .map(|i| {
+            let ident = format_ident!("Item{}", i);
+            quote! {
+                #ident: Default + Clone + Debug + Sync + Send + 'static
+            }
+        })
+        .collect();
+
+    let front_seq = (0..num_fields).map(|i| format_ident!("front{}", i));
+
+    let item_seq = (0..num_fields).map(|i| format_ident!("item{}", i));
+    let item_seq2 = item_seq.clone();
+    let item_seq3 = item_seq.clone();
+
+    let item_heap = (0..num_fields).map(|i| format_ident!("item{}_heap", i));
+    let item_heap2 = item_heap.clone();
+    let item_heap3 = item_heap.clone();
+
+    let key_seq = (0..num_fields).map(|i| format_ident!("key{}", i));
+    let key_seq2 = key_seq.clone();
+    let key_seq3 = key_seq.clone();
+    let key_seq4 = key_seq.clone();
+    let key_seq5 = key_seq.clone();
+
+    let case: Vec<_> = (0..num_fields)
+        .map(|i| {
+            let item_type = format_ident!("Item{}", i);
+
+            let item_heap = format_ident!("item{}_heap", i);
+            let item_heap_seq = (0..num_fields).map(|i| format_ident!("item{}_heap", i));
+
+            quote! {
+                #inbound_message_enum::#item_type(msg) => {
+                    state.#item_heap.push(Reverse(msg));
+                    loop {
+                        if #( state.#item_heap_seq.len() == 0 )||*
+                        {
+                            break;
+                        }
+                        check_and_send(state);
+                    }
+                }
+            }
+        })
+        .collect();
+
+    let expanded = quote! {
+
+        impl<Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Sync + Send,
+            #( #type_with_bounds_seq ),*> OnMessage for #inbound_message_enum<Key, #(#type_seq), *>
+        {
+            fn on_message(
+                self,
+                _prop: &Self::Prop,
+                state: &mut Self::State,
+                outbound: &Self::OutboundHub,
+                _request: &Self::RequestHub)
+            {
+                let check_and_send = |s: &mut Self::State| {
+                    #(
+                    let #front_seq = s.#item_heap.peek().unwrap();
+                    let #key_seq2 = #front_seq.0.key.clone();
+                    )*
+
+                    let mut min = key0.clone();
+                    #(
+                    min = std::cmp::min(#key_seq3.clone(), min.clone());
+                    )*
+
+                    if #(#key_seq4 == min) && * {
+                        #(let #item_seq = s.#item_heap2.pop().unwrap();)*
+
+                        outbound.zipped.send(#tuple_struct {
+                            key: min,
+                            #(#item_seq2 : #item_seq3.0.value),*
+                        });
+                        return;
+                    }
+                    #(
+                    if #key_seq5 == min {
+                        s.#item_heap3.pop();
+                    }
+                    )*
+                };
+
+                match self {
+                    #( #case )*
+                }
+            }
+        }
+    };
+
+    TokenStream::from(expanded)
+}
diff --git a/hollywood_macros/src/core.rs b/hollywood_macros/src/core.rs
new file mode 100644
index 0000000..4600252
--- /dev/null
+++ b/hollywood_macros/src/core.rs
@@ -0,0 +1,492 @@
+use convert_case::{Case, Casing};
+use proc_macro2::TokenStream;
+use quote::quote;
+use syn::{
+    parse::Parse, parse::ParseStream, parse2, Error, Fields, Generics, Ident, Item, ItemEnum,
+    ItemStruct, Path, PathArguments, Result, Token, Type, TypePath,
+};
+
+/// Documentation is in the hollywood crate.
+pub(crate) fn actor_outputs_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
+    let ast = match parse2::<ItemStruct>(item) {
+        Ok(ast) => ast,
+        Err(err) => return err.to_compile_error(),
+    };
+    let struct_name = &ast.ident;
+    let generics = &ast.generics;
+    let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
+
+    let fields = match &ast.fields {
+        Fields::Named(fields_named) => &fields_named.named,
+        _ => panic!("`generate_outputs_trait` can only be used with structs with named fields"),
+    };
+
+    let output_assignments = fields.iter().map(|field| {
+        let field_name = &field.ident;
+        if let Some(inner_ty) = is_output_type(&field.ty) {
+            // if the field type is OutboundChannel<T>, use OutboundChannel::<T>
+            quote! {
+                #field_name: OutboundChannel::<#inner_ty>::new(
+                    context,
+                    stringify!(#field_name).to_owned(),
+                    actor_name,
+                )
+            }
+        } else {
+            panic!("field type must be OutboundChannel<T>.");
+        }
+    });
+
+    let output_extract = fields.iter().map(|field| {
+        let field_name = &field.ident;
+
+        quote! {
+            #field_name: self.#field_name.extract()
+        }
+    });
+
+    let output_act = fields.iter().map(|field| {
+        let field_name = &field.ident;
+
+        quote! {
+            self.#field_name.activate();
+        }
+    });
+
+    let gen = quote! {
+        impl #impl_generics OutboundHub for #struct_name #ty_generics #where_clause {
+            fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
+                Self {
+                    #(#output_assignments),*
+                }
+            }
+        }
+
+        impl #impl_generics Activate for #struct_name #ty_generics #where_clause {
+            fn extract(&mut self) -> Self {
+                Self {
+                    #(#output_extract),*
+                }
+            }
+
+            fn activate(&mut self) {
+                #(#output_act)*
+            }
+        }
+
+        #ast
+    };
+
+    gen.into()
+}
+
+// This function checks if the field's type is OutboundChannel<T> and return T if it is
+fn is_output_type(ty: &Type) -> Option<&Type> {
+    if let Type::Path(TypePath {
+        path: Path { segments, .. },
+        ..
+    }) = ty
+    {
+        if segments.len() == 1 && segments[0].ident == "OutboundChannel" {
+            if let PathArguments::AngleBracketed(args) = &segments[0].arguments {
+                if args.args.len() == 1 {
+                    if let syn::GenericArgument::Type(inner_ty) = args.args.first().unwrap() {
+                        return Some(inner_ty);
+                    }
+                }
+            }
+        }
+    }
+    None
+}
+
+/// Documentation is in the hollywood crate.
+pub(crate) fn actor_requests_impl(_attr: TokenStream, item: TokenStream) -> TokenStream {
+    let ast = match parse2::<ItemStruct>(item) {
+        Ok(ast) => ast,
+        Err(err) => return err.to_compile_error(),
+    };
+    let struct_name = &ast.ident;
+    let generics = &ast.generics;
+    let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
+
+    let fields = match &ast.fields {
+        Fields::Named(fields_named) => &fields_named.named,
+        _ => panic!("`generate_outputs_trait` can only be used with structs with named fields"),
+    };
+
+    let request_assignments = fields.iter().map(|field| {
+        let field_name = &field.ident;
+        quote! {
+            #field_name: RequestChannel::new(
+                stringify!(#field_name).to_owned(),
+                actor_name,
+                sender,
+            )
+        }
+    });
+
+    let request_extract = fields.iter().map(|field| {
+        let field_name = &field.ident;
+
+        quote! {
+            #field_name: self.#field_name.extract()
+        }
+    });
+
+    let output_act = fields.iter().map(|field| {
+        let field_name = &field.ident;
+
+        quote! {
+            self.#field_name.activate();
+        }
+    });
+
+    let field0 = fields.first().expect("Request struct must have at least one field");
+    let m_type = is_request_type(&field0.ty).unwrap()[2];
+
+    let gen = quote! {
+        impl #impl_generics RequestHub<#m_type> for #struct_name #ty_generics #where_clause {
+            fn from_parent_and_sender(
+                actor_name: &str, sender: &tokio::sync::mpsc::Sender<#m_type>
+            ) -> Self {
+                Self {
+                    #(#request_assignments),*
+                }
+            }
+        }
+
+        impl #impl_generics Activate for #struct_name #ty_generics #where_clause {
+            fn extract(&mut self) -> Self {
+                Self {
+                    #(#request_extract),*
+                }
+            }
+
+            fn activate(&mut self) {
+                #(#output_act)*
+            }
+        }
+
+        #ast
+    };
+
+    gen.into()
+}
+
+// This function checks if the field's type is RequestChannel<Request, Reply, M>
+fn is_request_type(ty: &Type) -> Option<[&Type; 3]> {
+    if let Type::Path(TypePath {
+        path: Path { segments, .. },
+        ..
+    }) = ty
+    {
+        if segments.len() == 1 && segments[0].ident == "RequestChannel" {
+            if let PathArguments::AngleBracketed(args) = &segments[0].arguments {
+                if args.args.len() == 3 {
+                    let mut pop_iter = args.args.iter();
+                    if let syn::GenericArgument::Type(request_ty) = pop_iter.nth(0).unwrap() {
+                        if let syn::GenericArgument::Type(reply_ty) = pop_iter.nth(0).unwrap() {
+                            if let syn::GenericArgument::Type(m_ty) = pop_iter.nth(0).unwrap() {
+                                return Some([request_ty, reply_ty, m_ty]);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+    None
+}
+
+/// Documentation is in the hollywood crate.
+pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream {
+    let ActorInbound {
+        struct_name,
+        prop_type,
+        state_type,
+        output_type,
+        request_type,
+    } = match parse2::<ActorInbound>(args) {
+        Ok(args) => args,
+        Err(err) => return err.to_compile_error(),
+    };
+    let ast = match parse2::<ItemEnum>(inbound) {
+        Ok(ast) => ast,
+        Err(err) => return err.to_compile_error(),
+    };
+
+    let name = &ast.ident;
+    let generics = &ast.generics;
+    let fields = &ast.variants;
+    let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
+
+    let inbound = fields.iter().map(|variant| {
+        let variant_name = &variant.ident;
+        let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake);
+        let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span());
+        let field_type = if let Fields::Unnamed(fields_unnamed) = &variant.fields {
+            &fields_unnamed.unnamed[0].ty
+        } else {
+            panic!("Enum variants must be tuples");
+        };
+
+        let msg = format!(
+            "`{}` channel field - autogenerated by the [actor_inputs] macro.",
+            variant_name
+        );
+        quote! {
+            #[doc = #msg]
+            pub #snake_case_variant_name: InboundChannel<#field_type, #name #ty_generics>
+        }
+    });
+
+    let match_arm = fields.iter().map(|variant| {
+        let variant_name = &variant.ident;
+        quote! {
+            #name::#variant_name(_) => {
+                stringify!(#variant_name).to_string()
+            }
+        }
+    });
+
+    let from_builder_inbounds = fields.iter().map(|variant| {
+        let variant_name = &variant.ident;
+        let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake);
+        let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span());
+
+        assert!(
+            generics.params.len() <= 1,
+            "Only zero or one generic parameter is supported, got {}",
+            generics.params.len()
+        );
+
+        let generic_ident =
+            if let Some(syn::GenericParam::Type(type_param)) = generics.params.first() {
+                // Extracts just the identifier of the type parameter (e.g., `T`)
+                Some(&type_param.ident)
+            } else {
+                None
+            };
+
+        let instantiation = if let Some(ident) = generic_ident {
+            // Use the extracted identifier directly
+            quote! { #name::#variant_name(#ident::default()) }
+        } else {
+            // When there are no generics
+            quote! { #name::#variant_name(Default::default()) }
+        };
+
+        quote! {
+            let #snake_case_variant_name = InboundChannel::new(
+                &mut builder.context,
+                actor_name.clone(),
+                &builder.sender,
+                #instantiation.inbound_channel(),
+            );
+            builder.forward.insert(
+                #snake_case_variant_name.name.clone(),
+                Box::new(#snake_case_variant_name.clone())
+            );
+        }
+    });
+
+    let from_builder_init = fields.iter().map(|variant| {
+        let variant_name = &variant.ident;
+        let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake);
+        let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span());
+
+        quote! {
+            #snake_case_variant_name,
+        }
+    });
+
+    let gen = quote! {
+        #ast
+
+        /// Auto-generated inbound hub for actor.
+        pub struct #struct_name #impl_generics #where_clause {
+            #(#inbound),*
+        }
+
+        impl #impl_generics InboundMessage for #name #ty_generics #where_clause {
+            type Prop = #prop_type;
+            type State = #state_type;
+            type OutboundHub = #output_type;
+            type RequestHub = #request_type;
+
+            fn inbound_channel(&self) -> String {
+                match self {
+                   #(#match_arm),*
+                }
+            }
+        }
+
+        impl #impl_generics InboundHub<
+            #prop_type,
+            #state_type,
+            #output_type,
+            #request_type,
+            #name #ty_generics> for #struct_name #ty_generics #where_clause
+        {
+            fn from_builder(
+                builder: &mut ActorBuilder<
+                    #prop_type,
+                    #state_type,
+                    #output_type,
+                    #request_type,
+                    #name
+                    #ty_generics
+                >,
+                actor_name: &str) -> Self
+            {
+                #(#from_builder_inbounds)*
+
+                #struct_name {
+                    #(#from_builder_init)*
+                }
+            }
+        }
+
+    };
+
+    gen.into()
+}
+
+struct ActorInbound {
+    struct_name: Ident,
+    prop_type: Ident,
+    state_type: Ident,
+    output_type: Ident,
+    request_type: Ident,
+}
+
+impl Parse for ActorInbound {
+    fn parse(inbound: ParseStream) -> Result<Self> {
+        let struct_name: Ident = inbound.parse()?;
+        let _: Generics = inbound.parse()?;
+        let _: Token![,] = inbound.parse()?;
+        let content;
+        syn::braced!(content in inbound);
+        let prop_type: Ident = content.parse()?;
+        let _: Token![,] = content.parse()?;
+        let state_type: Ident = content.parse()?;
+        let _: Token![,] = content.parse()?;
+        let output_type: Ident = content.parse()?;
+        let _: Token![,] = content.parse()?;
+        let request_type: Ident = content.parse()?;
+        Ok(ActorInbound {
+            struct_name,
+            prop_type,
+            state_type,
+            output_type,
+            request_type,
+        })
+    }
+}
+
+struct ActorArgs {
+    message_type: Ident,
+}
+
+impl Parse for ActorArgs {
+    fn parse(inbound_hub: ParseStream) -> Result<Self> {
+        let message_type: Ident = inbound_hub.parse()?;
+        Ok(ActorArgs { message_type })
+    }
+}
+
+/// Documentation is in the hollywood crate.
+pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream {
+    // parse inbound
+    let ActorArgs { message_type } = match parse2::<ActorArgs>(attr) {
+        Ok(args) => args,
+        Err(err) => return err.to_compile_error(),
+    };
+    let inbound: Item = match parse2(item) {
+        Ok(inbound) => inbound,
+        Err(err) => return err.to_compile_error(),
+    };
+    let inbound_clone = inbound.clone();
+
+    // Get actor name from the item
+    let actor_name = match inbound {
+        Item::Type(item) => item.ident,
+        _ => panic!("`actor` attribute can only be used with type aliases"),
+    };
+
+    let mut inbound_clone = inbound_clone.clone();
+    let mut attrs = Vec::new();
+    if let Item::Type(item_type) = &mut inbound_clone {
+        attrs.append(&mut item_type.attrs);
+        // ...
+    }
+
+    let mut maybe_prop = None;
+    let mut maybe_inbounds = None;
+    let mut maybe_state = None;
+    let mut maybe_outputs = None;
+    let mut maybe_requests = None;
+
+    if let Item::Type(item_type) = inbound_clone {
+        if let Type::Path(type_path) = *item_type.ty {
+            if type_path.path.segments.last().unwrap().ident != "Actor" {
+                return Error::new_spanned(&type_path, "Expected Actor<...>")
+                    .to_compile_error()
+                    .into();
+            }
+            for segment in type_path.path.segments {
+                if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments {
+                    if angle_bracketed_args.args.len() != 5 {
+                        return Error::new_spanned(
+                            &angle_bracketed_args,
+                            concat!(
+                                "Expected 5 type arguments:",
+                                "Actor<PROP, INBOUNDS, STATE, OUTBOUNDS, REQUESTS>"
+                            ),
+                        )
+                        .to_compile_error()
+                        .into();
+                    }
+                    maybe_prop = Some(angle_bracketed_args.args[0].clone());
+                    maybe_inbounds = Some(angle_bracketed_args.args[1].clone());
+                    maybe_state = Some(angle_bracketed_args.args[2].clone());
+                    maybe_outputs = Some(angle_bracketed_args.args[3].clone());
+                    maybe_requests = Some(angle_bracketed_args.args[4].clone());
+                }
+            }
+        } else {
+            return Error::new_spanned(&item_type.ty, "Expected a type path")
+                .to_compile_error()
+                .into();
+        }
+    } else {
+        panic!("`actor` attribute can only be used with type aliases");
+    }
+
+    let prop = maybe_prop.unwrap();
+    let inbound = maybe_inbounds.unwrap();
+    let state_type = maybe_state.unwrap();
+    let out = maybe_outputs.unwrap();
+    let requests = maybe_requests.unwrap();
+
+    let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type,  #out, #requests> };
+
+    let gen = quote! {
+
+        ///
+        #( #attrs )*
+        pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>;
+
+        impl FromPropState<
+                #prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type
+            > for #actor_name
+        {
+            fn name_hint(prop: &#prop) -> String {
+                stringify!(#actor_name).to_owned()
+            }
+        }
+    };
+
+    gen.into()
+}
diff --git a/hollywood_macros/src/lib.rs b/hollywood_macros/src/lib.rs
index b32245a..d0f669e 100644
--- a/hollywood_macros/src/lib.rs
+++ b/hollywood_macros/src/lib.rs
@@ -2,352 +2,82 @@
 
 //! Convenience macros for the Hollywood actor framework.
 
+mod actors;
+mod core;
+
 extern crate proc_macro;
 
-use convert_case::{Case, Casing};
 use proc_macro::TokenStream;
 use quote::quote;
-use syn::{
-    parse::Parse, parse::ParseStream, parse_macro_input, Error, Fields, Ident, Item, ItemEnum,
-    ItemStruct, Path, PathArguments, Result, Token, Type, TypePath,
-};
 
-/// Documentation is in the hollywood crate.
+/// Documented in the root-level hollywood crate.
 #[proc_macro_attribute]
-pub fn actor_outputs(_attr: TokenStream, item: TokenStream) -> TokenStream {
-    let ast = parse_macro_input!(item as ItemStruct);
-
-    let struct_name = &ast.ident;
-    let fields = match &ast.fields {
-        Fields::Named(fields_named) => &fields_named.named,
-        _ => panic!("`generate_outputs_trait` can only be used with structs with named fields"),
-    };
-
-    let output_assignments = fields.iter().map(|field| {
-        let field_name = &field.ident;
-        if let Some(inner_ty) = is_output_type(&field.ty) {
-            // if the field type is OutboundChannel<T>, use OutboundChannel::<T>
-            quote! {
-                #field_name: OutboundChannel::<#inner_ty>::new(
-                    context,
-                    stringify!(#field_name).to_owned(),
-                    actor_name,
-                )
-            }
-        } else {
-            panic!("field type must be OutboundChannel<T>.");
-        }
-    });
-
-    let output_extract = fields.iter().map(|field| {
-        let field_name = &field.ident;
-
-        quote! {
-            #field_name: self.#field_name.extract()
-        }
-    });
-
-    let output_act = fields.iter().map(|field| {
-        let field_name = &field.ident;
-
-        quote! {
-            self.#field_name.activate();
-        }
-    });
-
-    let gen = quote! {
-        #ast
-
-        impl OutboundHub for #struct_name {
-            fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
-                Self {
-                    #(#output_assignments),*
-                }
-            }
-        }
-
-        impl Morph for #struct_name {
-            fn extract(&mut self) -> Self {
-                Self {
-                    #(#output_extract),*
-                }
-            }
-
-            fn activate(&mut self) {
-                #(#output_act)*
-            }
-        }
-
-    };
-
-    gen.into()
+pub fn actor_outputs(attr: TokenStream, item: TokenStream) -> TokenStream {
+    core::actor_outputs_impl(
+        proc_macro2::TokenStream::from(attr),
+        proc_macro2::TokenStream::from(item),
+    )
+    .into()
 }
 
-// This function checks if the field's type is OutboundChannel<T> and return T if it is
-fn is_output_type(ty: &Type) -> Option<&Type> {
-    if let Type::Path(TypePath {
-        path: Path { segments, .. },
-        ..
-    }) = ty
-    {
-        if segments.len() == 1 && segments[0].ident == "OutboundChannel" {
-            if let PathArguments::AngleBracketed(args) = &segments[0].arguments {
-                if args.args.len() == 1 {
-                    if let syn::GenericArgument::Type(inner_ty) = args.args.first().unwrap() {
-                        return Some(inner_ty);
-                    }
-                }
-            }
-        }
-    }
-    None
+/// Documented in the root-level hollywood crate.
+#[proc_macro_attribute]
+pub fn actor_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
+    core::actor_requests_impl(
+        proc_macro2::TokenStream::from(attr),
+        proc_macro2::TokenStream::from(item),
+    )
+    .into()
 }
 
-/// Documentation is in the hollywood crate.
+/// Documented in the root-level hollywood crate.
 #[proc_macro_attribute]
 pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
-    let args = parse_macro_input!(args as ActorInbound);
-    let ast = parse_macro_input!(inbound as ItemEnum);
-
-    let name = &ast.ident;
-    let fields = &ast.variants;
-
-    let struct_name = &args.struct_name;
-    let prop_type = &args.prop_type;
-    let state_type = &args.state_type;
-    let output_type = &args.output_type;
-    let request_type = &args.request_type;
-
-    let inbound = fields.iter().map(|variant| {
-        let variant_name = variant.ident.clone();
-        let snake_case_variant_name_str = variant_name.to_string().to_case(Case::Snake);
-        let snake_case_variant_name = Ident::new(&snake_case_variant_name_str, variant_name.span());
-        let field_type = if let Fields::Unnamed(fields_unnamed) = &variant.fields {
-            &fields_unnamed.unnamed[0].ty
-        } else {
-            panic!("Enum variants must be tuples");
-        };
-
-        let msg = format!(
-            "`{}` channel field - autogenerated by the [actor_inputs] macro.",
-            variant_name
-        );
-        quote! {
-            #[doc = #msg]
-            pub  #snake_case_variant_name: InboundChannel<#field_type, #name>
-        }
-    });
-
-    let match_arm = fields.iter().map(|variant| {
-        let variant_name = &variant.ident;
-        if let Fields::Unnamed(fields_unnamed) = &variant.fields {
-            &fields_unnamed.unnamed[0].ty
-        } else {
-            panic!("Enum variants must be tuples");
-        };
-
-        quote! {
-            #name::#variant_name(msg) => {
-                stringify!(#variant_name).to_string()
-            }
-        }
-    });
-
-    let from_builder_inbounds = fields.iter().map(|variant| {
-        let variant_name = &variant.ident;
-        let snake_case_variant_name = variant.ident.clone().to_string().to_case(Case::Snake);
-        let snake_case_variant_name = Ident::new(&snake_case_variant_name, variant_name.span());
-        let field_type = if let Fields::Unnamed(fields_unnamed) = &variant.fields {
-            &fields_unnamed.unnamed[0].ty
-        } else {
-            panic!("Enum variants must be tuples");
-        };
-
-        quote! {
-            let #snake_case_variant_name = InboundChannel::<#field_type, #name>::new(
-                &mut builder.context,
-                actor_name.clone(),
-                &builder.sender,
-                #name::#variant_name(Default::default()).inbound_channel(),
-            );
-            builder
-                .forward
-                .insert(#snake_case_variant_name.name.clone(), Box::new(#snake_case_variant_name.clone()));
-        }
-    });
-
-    let from_builder_init = fields.iter().map(|variant| {
-        let variant_name = variant.ident.clone();
-        let snake_case_variant_name = variant_name.to_string().to_case(Case::Snake);
-        let snake_case_variant_name = Ident::new(&snake_case_variant_name, variant_name.span());
-
-        quote! {
-            #snake_case_variant_name,
-        }
-    });
-
-    let gen = quote! {
-        #ast
-
-        /// Auto-generated inbound hub for actor.
-        pub struct #struct_name {
-            #(#inbound),*
-        }
-
-        impl InboundMessage for #name {
-            type Prop = #prop_type;
-            type State = #state_type;
-            type OutboundHub = #output_type;
-            type RequestHub = #request_type;
-
-            fn inbound_channel(&self) -> String {
-                match self {
-                   #(#match_arm),*
-                }
-            }
-        }
-
-        impl InboundHub<#prop_type, #state_type, #output_type, #request_type,#name> for #struct_name {
-
-            fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type,#request_type, #name>,
-                            actor_name: &str) -> Self {
-                #(#from_builder_inbounds)*
-
-                #struct_name {
-                    #(#from_builder_init)*
-                }
-            }
-        }
-
-    };
-
-    gen.into()
-}
-
-struct ActorArgs {
-    message_type: Ident,
+    core::actor_inputs_impl(
+        proc_macro2::TokenStream::from(args),
+        proc_macro2::TokenStream::from(inbound),
+    )
+    .into()
 }
 
-impl Parse for ActorArgs {
-    fn parse(inbound_hub: ParseStream) -> Result<Self> {
-        let message_type: Ident = inbound_hub.parse()?;
-        Ok(ActorArgs { message_type })
-    }
-}
-
-/// Documentation is in the hollywood crate.
+/// Documented in the root-level hollywood crate.
 #[proc_macro_attribute]
 pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
-    // parse inbound
-    let ActorArgs { message_type } = parse_macro_input!(attr as ActorArgs);
-    let inbound: Item = parse_macro_input!(item);
-    let inbound_clone = inbound.clone();
-
-    // Get actor name from the item
-    let actor_name = match inbound {
-        Item::Type(item) => item.ident,
-        _ => panic!("`actor` attribute can only be used with type aliases"),
-    };
-
-    let mut inbound_clone = inbound_clone.clone();
-    let mut attrs = Vec::new();
-    if let Item::Type(item_type) = &mut inbound_clone {
-        attrs.append(&mut item_type.attrs);
-        // ...
-    }
-
-    let mut maybe_prop = None;
-    let mut maybe_inbounds = None;
-    let mut maybe_state = None;
-    let mut maybe_outputs = None;
-    let mut maybe_requests = None;
-
-    if let Item::Type(item_type) = inbound_clone {
-        if let Type::Path(type_path) = *item_type.ty {
-            if type_path.path.segments.last().unwrap().ident != "Actor" {
-                return Error::new_spanned(&type_path, "Expected Actor<...>")
-                    .to_compile_error()
-                    .into();
-            }
-            for segment in type_path.path.segments {
-                if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments {
-                    if angle_bracketed_args.args.len() != 5 {
-                        return Error::new_spanned(
-                            &angle_bracketed_args,
-                            "Expected 5 type arguments: Actor<PROP, INBOUNDS, STATE, OUTBOUNDS, REQUESTS>",
-                        )
-                        .to_compile_error()
-                        .into();
-                    }
-                    maybe_prop = Some(angle_bracketed_args.args[0].clone());
-                    maybe_inbounds = Some(angle_bracketed_args.args[1].clone());
-                    maybe_state = Some(angle_bracketed_args.args[2].clone());
-                    maybe_outputs = Some(angle_bracketed_args.args[3].clone());
-                    maybe_requests = Some(angle_bracketed_args.args[4].clone());
-                }
-            }
-        } else {
-            return Error::new_spanned(&item_type.ty, "Expected a type path")
-                .to_compile_error()
-                .into();
-        }
-    } else {
-        panic!("`actor` attribute can only be used with type aliases");
-    }
-
-    let prop = maybe_prop.unwrap();
-    let inbound = maybe_inbounds.unwrap();
-    let state_type = maybe_state.unwrap();
-    let out = maybe_outputs.unwrap();
-    let requests = maybe_requests.unwrap();
-
-    let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type,  #out, #requests> };
-
-    let gen = quote! {
-
-        ///
-        #( #attrs )*
-        pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>;
-
-        impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type>
-            for #actor_name
-        {
-            fn name_hint(prop: &#prop) -> String {
-                stringify!(#actor_name).to_owned()
-            }
-        }
-    };
-
-    gen.into()
+    core::actor_impl(
+        proc_macro2::TokenStream::from(attr),
+        proc_macro2::TokenStream::from(item),
+    )
+    .into()
 }
 
-struct ActorInbound {
-    struct_name: Ident,
-    prop_type: Ident,
-    state_type: Ident,
-    output_type: Ident,
-    request_type: Ident,
-}
+/// Documented in the root-level hollywood crate.
+#[proc_macro]
+pub fn zip_n(input: TokenStream) -> TokenStream {
+    let parsed = proc_macro2::TokenStream::from(input.clone());
+    let parsed2 = parsed.clone();
+    let parsed3 = parsed.clone();
+    let parsed4 = parsed.clone();
+    let parsed5 = parsed.clone();
+    let parsed6 = parsed.clone();
+    let parsed7 = parsed.clone();
+
+    let output_tuple_n = actors::zip::tuple_n_impl(parsed);
+    let output_zip_outbound_n = actors::zip::zip_outbound_n_impl(parsed2);
+    let output_zip_state_n = actors::zip::zip_state_n_impl(parsed3);
+    let output_inbound_message_n = actors::zip::zip_inbound_message_n_impl(parsed4);
+    let output_zip_n = actors::zip::zip_n_impl(parsed5);
+    let output_zip_inbound_n = actors::zip::zip_inbound_n_impl(parsed6);
+    let output_zip_onmessage_n_new = actors::zip::zip_onmessage_n_impl(parsed7);
+
+    let combined_output = quote! {
+        #output_tuple_n
+        #output_zip_outbound_n
+        #output_zip_state_n
+        #output_inbound_message_n
+        #output_zip_n
+        #output_zip_inbound_n
+        #output_zip_onmessage_n_new
+    };
 
-impl Parse for ActorInbound {
-    fn parse(inbound: ParseStream) -> Result<Self> {
-        let struct_name: Ident = inbound.parse()?;
-        let _: Token![,] = inbound.parse()?;
-        let content;
-        syn::braced!(content in inbound);
-        let prop_type: Ident = content.parse()?;
-        let _: Token![,] = content.parse()?;
-        let state_type: Ident = content.parse()?;
-        let _: Token![,] = content.parse()?;
-        let output_type: Ident = content.parse()?;
-        let _: Token![,] = content.parse()?;
-        let request_type: Ident = content.parse()?;
-        Ok(ActorInbound {
-            struct_name,
-            prop_type,
-            state_type,
-            output_type,
-            request_type
-        })
-    }
+    TokenStream::from(combined_output)
 }
diff --git a/src/actors.rs b/src/actors.rs
index 6f737dc..a21f4e2 100644
--- a/src/actors.rs
+++ b/src/actors.rs
@@ -5,3 +5,25 @@ pub use periodic::Periodic;
 /// Generic printer actor.
 pub mod printer;
 pub use printer::{Printer, PrinterProp};
+
+/// Nudge actor.
+pub mod nudge;
+pub use nudge::Nudge;
+
+/// Zip actor.
+pub mod zip;
+pub use zip::Zip2;
+pub use zip::Zip3;
+pub use zip::Zip4;
+pub use zip::Zip5;
+pub use zip::Zip6;
+pub use zip::Zip7;
+pub use zip::Zip8;
+pub use zip::Zip9;
+pub use zip::Zip10;
+pub use zip::Zip11;
+pub use zip::Zip12;
+
+/// Egui actor.
+#[cfg(feature="egui")]
+pub mod egui;
diff --git a/src/actors/nudge.rs b/src/actors/nudge.rs
new file mode 100644
index 0000000..188c003
--- /dev/null
+++ b/src/actors/nudge.rs
@@ -0,0 +1,153 @@
+use std::fmt::Debug;
+
+use async_trait::async_trait;
+use hollywood_macros::actor_outputs;
+
+use crate::compute::context::Context;
+use crate::core::connection::ConnectionEnum;
+use crate::core::NullState;
+
+use crate::core::request::NullRequest;
+use crate::core::{
+    actor::{ActorNode, FromPropState, GenericActor},
+    inbound::{ForwardMessage, NullInbound, NullMessage},
+    outbound::{Activate, OutboundChannel, OutboundHub},
+    runner::Runner,
+};
+
+/// Prop for the nudge actor.
+#[derive(Clone, Debug, Default)]
+pub struct NudgeProp<Item: Clone> {
+    /// The attached item.
+    pub item: Item,
+}
+
+/// A nudge actor.
+///
+/// All it does is to send a nudge message containing the item once.
+pub type Nudge<Item> = GenericActor<
+    NudgeProp<Item>,
+    NullInbound,
+    NullState,
+    NudgeOutbound<Item>,
+    NullRequest,
+    NudgeRunner,
+>;
+
+impl<Item: Default + Sync + Send + Debug + 'static + Clone> Nudge<Item> {
+    /// Create a new nudge actor
+    pub fn new(context: &mut Context, item: Item) -> Nudge<Item> {
+        Nudge::from_prop_and_state(context, NudgeProp::<Item> { item }, NullState::default())
+    }
+}
+
+impl<Item: Default + Sync + Send + Clone + Debug + 'static>
+    FromPropState<
+        NudgeProp<Item>,
+        NullInbound,
+        NullState,
+        NudgeOutbound<Item>,
+        NullMessage<NudgeProp<Item>, NullState, NudgeOutbound<Item>, NullRequest>,
+        NullRequest,
+        NudgeRunner,
+    > for Nudge<Item>
+{
+    fn name_hint(_prop: &NudgeProp<Item>) -> String {
+        "Nudge".to_owned()
+    }
+}
+
+/// Nudge outbound hub
+#[actor_outputs]
+pub struct NudgeOutbound<Item: 'static + Default + Clone + Send + Sync + std::fmt::Debug> {
+    /// Nudge outbound channel.
+    pub nudge: OutboundChannel<Item>,
+}
+
+
+/// The custom nudge runner
+pub struct NudgeRunner {}
+
+impl<Item: Default + Sync + Send + Clone + Debug + 'static>
+    Runner<
+        NudgeProp<Item>,
+        NullInbound,
+        NullState,
+        NudgeOutbound<Item>,
+        NullRequest,
+        NullMessage<NudgeProp<Item>, NullState, NudgeOutbound<Item>, NullRequest>,
+    > for NudgeRunner
+{
+    /// Create a new actor node.
+    fn new_actor_node(
+        name: String,
+        prop: NudgeProp<Item>,
+        state: NullState,
+        _receiver: tokio::sync::mpsc::Receiver<
+            NullMessage<NudgeProp<Item>, NullState, NudgeOutbound<Item>, NullRequest>,
+        >,
+        _forward: std::collections::HashMap<
+            String,
+            Box<
+                dyn ForwardMessage<
+                        NudgeProp<Item>,
+                        NullState,
+                        NudgeOutbound<Item>,
+                        NullRequest,
+                        NullMessage<NudgeProp<Item>, NullState, NudgeOutbound<Item>, NullRequest>,
+                    > + Send
+                    + Sync,
+            >,
+        >,
+        outbound: NudgeOutbound<Item>,
+        _request: NullRequest,
+    ) -> Box<dyn ActorNode + Send + Sync> {
+        Box::new(NudgeActor::<Item> {
+            name: name.clone(),
+            prop,
+            init_state: state.clone(),
+            state: None,
+            outbound: Some(outbound),
+        })
+    }
+}
+
+/// The nudge actor.
+pub struct NudgeActor<Item: Clone + 'static + Default + Clone + Send + Sync + std::fmt::Debug> {
+    name: String,
+    prop: NudgeProp<Item>,
+    init_state: NullState,
+    state: Option<NullState>,
+    outbound: Option<NudgeOutbound<Item>>,
+}
+
+#[async_trait]
+impl<Item: 'static + Default + Clone + Send + Sync + std::fmt::Debug> ActorNode
+    for NudgeActor<Item>
+{
+    fn name(&self) -> &String {
+        &self.name
+    }
+
+    fn reset(&mut self) {
+        self.state = Some(self.init_state.clone());
+    }
+
+    async fn run(&mut self, mut _kill: tokio::sync::broadcast::Receiver<()>) {
+        let mut outbound = self.outbound.take().unwrap();
+        outbound.activate();
+        self.reset();
+
+        match &outbound.nudge.connection_register {
+            ConnectionEnum::Config(_) => {
+                panic!("Cannot extract connection config")
+            }
+            ConnectionEnum::Active(active) => {
+                for i in active.maybe_registers.as_ref().unwrap().iter() {
+                    println!("NudgeActor: sending");
+                    i.send_impl(self.prop.item.clone());
+                }
+            }
+        }
+    }
+}
diff --git a/src/actors/periodic.rs b/src/actors/periodic.rs
index 4d9f5fc..64cd5d2 100644
--- a/src/actors/periodic.rs
+++ b/src/actors/periodic.rs
@@ -9,7 +9,7 @@ use crate::core::request::NullRequest;
 use crate::core::{
     actor::{ActorNode, FromPropState, GenericActor},
     inbound::{ForwardMessage, NullInbound, NullMessage},
-    outbound::{Morph, OutboundChannel, OutboundHub},
+    outbound::{Activate, OutboundChannel, OutboundHub},
     runner::Runner,
 };
 
@@ -97,7 +97,7 @@ pub struct PeriodicOutbound {
     pub time_stamp: OutboundChannel<f64>,
 }
 
-impl Morph for PeriodicOutbound {
+impl Activate for PeriodicOutbound {
     fn extract(&mut self) -> Self {
         Self {
             time_stamp: self.time_stamp.extract(),
diff --git a/src/actors/printer.rs b/src/actors/printer.rs
index 0badfe3..5291c5b 100644
--- a/src/actors/printer.rs
+++ b/src/actors/printer.rs
@@ -1,5 +1,7 @@
 use std::fmt::{Debug, Display};
 
+use hollywood_macros::actor_inputs;
+
 use crate::core::{
     request::NullRequest, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel,
     InboundHub, InboundMessage, InboundMessageNew, NullOutbound, NullState, OnMessage,
@@ -22,12 +24,15 @@ impl Default for PrinterProp {
 
 /// Inbound message for the printer actor.
 #[derive(Clone, Debug)]
-pub enum PrinterInboundMessage<T: Display + Clone + Sync + Send + 'static> {
+#[actor_inputs(PrinterInbound<T>, {PrinterProp, NullState, NullOutbound, NullRequest})]
+pub enum PrinterInboundMessage<T: Default + Debug + Display + Clone + Sync + Send + 'static> {
     /// Printable message.
     Printable(T),
 }
 
-impl<T: Debug + Display + Clone + Sync + Send + 'static> OnMessage for PrinterInboundMessage<T> {
+impl<T: Default + Debug + Display + Clone + Sync + Send + 'static> OnMessage
+    for PrinterInboundMessage<T>
+{
     fn on_message(
         self,
         prop: &PrinterProp,
@@ -43,7 +48,7 @@ impl<T: Debug + Display + Clone + Sync + Send + 'static> OnMessage for PrinterIn
     }
 }
 
-impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessageNew<T>
+impl<T: Default + Debug + Display + Clone + Sync + Send + 'static> InboundMessageNew<T>
     for PrinterInboundMessage<T>
 {
     fn new(_inbound_name: String, msg: T) -> Self {
@@ -51,7 +56,7 @@ impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessageNew<T>
     }
 }
 
-/// Generic printer actor.
+/// Printer actor.
 pub type Printer<T> = Actor<PrinterProp, PrinterInbound<T>, NullState, NullOutbound, NullRequest>;
 
 impl<T: Clone + Sync + Default + Send + 'static + Debug + Display>
@@ -69,50 +74,3 @@ impl<T: Clone + Sync + Default + Send + 'static + Debug + Display>
         format!("Printer({})", prop.topic)
     }
 }
-
-/// Builder for the generic printer.
-pub struct PrinterInbound<T: Debug + Display + Clone + Sync + Send + 'static> {
-    /// Inbound channel to receive printable messages.
-    pub printable: InboundChannel<T, PrinterInboundMessage<T>>,
-}
-
-impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessage
-    for PrinterInboundMessage<T>
-{
-    type Prop = PrinterProp;
-    type State = NullState;
-    type OutboundHub = NullOutbound;
-    type RequestHub = NullRequest;
-
-    fn inbound_channel(&self) -> String {
-        match self {
-            PrinterInboundMessage::Printable(_) => "Printable".to_owned(),
-        }
-    }
-}
-
-impl<T: Clone + Debug + Display + Default + Sync + Send + 'static>
-    InboundHub<PrinterProp, NullState, NullOutbound, NullRequest, PrinterInboundMessage<T>>
-    for PrinterInbound<T>
-{
-    fn from_builder(
-        builder: &mut ActorBuilder<
-            PrinterProp,
-            NullState,
-            NullOutbound,
-            NullRequest,
-            PrinterInboundMessage<T>,
-        >,
-        actor_name: &str,
-    ) -> Self {
-        let m = InboundChannel::new(
-            builder.context,
-            actor_name,
-            &builder.sender,
-            PrinterInboundMessage::Printable(T::default()).inbound_channel(),
-        );
-        builder.forward.insert(m.name.clone(), Box::new(m.clone()));
-
-        PrinterInbound { printable: m }
-    }
-}
diff --git a/src/actors/zip.rs b/src/actors/zip.rs
new file mode 100644
index 0000000..8614e33
--- /dev/null
+++ b/src/actors/zip.rs
@@ -0,0 +1,62 @@
+use std::cmp::Reverse;
+use std::fmt::Debug;
+use std::fmt::Display;
+
+use hollywood_macros::zip_n;
+
+use crate::compute::Context;
+use crate::core::request::NullRequest;
+use crate::core::Activate;
+use crate::core::Actor;
+use crate::core::DefaultRunner;
+use crate::core::FromPropState;
+use crate::core::InboundChannel;
+use crate::core::InboundHub;
+use crate::core::InboundMessage;
+use crate::core::InboundMessageNew;
+use crate::core::NullProp;
+use crate::core::OnMessage;
+use crate::core::OutboundChannel;
+use crate::core::OutboundHub;
+
+/// Type of the Xth inbound channel for the zip actor.
+#[derive(Clone, Debug, Default)]
+pub struct ZipPair<const N: usize, Key: PartialEq + Eq + PartialOrd + Ord, Value> {
+    /// Key to associate message from different inbound channels with.
+    pub key: Key,
+    /// The value to be zipped.
+    pub value: Value,
+}
+
+impl<const N: usize, Key: PartialEq + Eq + PartialOrd + Ord, T> PartialEq for ZipPair<N, Key, T> {
+    fn eq(&self, other: &Self) -> bool {
+        self.key == other.key
+    }
+}
+
+impl<const N: usize, Key: PartialEq + Eq + PartialOrd + Ord, T> Eq for ZipPair<N, Key, T> {}
+
+impl<const N: usize, Key: PartialEq + Eq + PartialOrd + Ord, T> PartialOrd for ZipPair<N, Key, T> {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl<const N: usize, Key: PartialEq + Eq + PartialOrd + Ord, T> Ord for ZipPair<N, Key, T> {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.key.cmp(&other.key)
+    }
+}
+
+
+zip_n!(2);
+zip_n!(3);
+zip_n!(4);
+zip_n!(5);
+zip_n!(6);
+zip_n!(7);
+zip_n!(8);
+zip_n!(9);
+zip_n!(10);
+zip_n!(11);
+zip_n!(12);
diff --git a/src/compute.rs b/src/compute.rs
index 96bbd13..9df60cc 100644
--- a/src/compute.rs
+++ b/src/compute.rs
@@ -4,12 +4,10 @@ pub use context::Context;
 
 /// The compute graph of actors.
 pub mod pipeline;
-pub use pipeline::Pipeline;
 pub(crate) use pipeline::CancelRequest;
+pub use pipeline::Pipeline;
 
 /// The graph topology.
 pub mod topology;
-pub(crate) use topology::Topology;
 pub(crate) use topology::ActorNode;
-
-
+pub(crate) use topology::Topology;
diff --git a/src/compute/context.rs b/src/compute/context.rs
index d648967..fb6d4c2 100644
--- a/src/compute/context.rs
+++ b/src/compute/context.rs
@@ -2,9 +2,7 @@ use std::marker::PhantomData;
 use std::sync::Arc;
 
 use crate::compute::{CancelRequest, Pipeline, Topology};
-use crate::core::{
-    InboundChannel, InboundMessage, OutboundChannel, OutboundConnection, ActorNode,
-};
+use crate::core::{ActorNode, InboundChannel, InboundMessage, OutboundChannel, OutboundConnection};
 
 /// The context of the compute graph which is used to configure the network topology.
 ///
@@ -35,7 +33,7 @@ impl Context {
         self.cancel_request_sender_template.clone()
     }
 
-       /// Registers an outbound channel for cancel request.
+    /// Registers an outbound channel for cancel request.
     ///
     /// Upon receiving a cancel request the registered outbound channel, the execution of the
     /// pipeline will be stopped.
@@ -49,7 +47,6 @@ impl Context {
             }));
     }
 
-
     fn new() -> Self {
         let (cancel_request_sender_template, cancel_request_receiver) =
             tokio::sync::mpsc::channel(1);
@@ -84,13 +81,14 @@ impl Context {
     }
 
     pub(crate) fn connect_impl<
-        T: Clone + std::fmt::Debug + Sync + Send + 'static,
+        T0: Clone + std::fmt::Debug + Sync + Send + 'static,
+        T1: Clone + std::fmt::Debug + Sync + Send + 'static,
         M: InboundMessage,
     >(
         &mut self,
-        outbound: &mut OutboundChannel<T>,
-        inbound: &mut InboundChannel<T, M>,
+        outbound: &mut OutboundChannel<T0>,
+        inbound: &mut InboundChannel<T1, M>,
     ) {
         self.topology.connect(outbound, inbound);
-    } 
+    }
 }
diff --git a/src/compute/pipeline.rs b/src/compute/pipeline.rs
index 20256f9..a50b34a 100644
--- a/src/compute/pipeline.rs
+++ b/src/compute/pipeline.rs
@@ -65,8 +65,8 @@ impl Pipeline {
 
     /// Executes the compute graph.
     ///
-    /// It consumes the self, starts  execution of the pipeline and returns a future (since it is 
-    /// an async function) that resolves to the pipeline itself. The future is completed when all 
+    /// It consumes the self, starts  execution of the pipeline and returns a future (since it is
+    /// an async function) that resolves to the pipeline itself. The future is completed when all
     /// actors have completed their execution.
     ///
     /// In particular, [ActorNode::run()] is called for each actor in the pipeline in a dedicated
diff --git a/src/compute/topology.rs b/src/compute/topology.rs
index b401bd9..b5c0b2a 100644
--- a/src/compute/topology.rs
+++ b/src/compute/topology.rs
@@ -149,12 +149,13 @@ impl Topology {
     }
 
     pub(crate) fn connect<
-        T: Clone + std::fmt::Debug + Sync + Send + 'static,
+        T0: Clone + std::fmt::Debug + Sync + Send + 'static,
+        T1: Clone + std::fmt::Debug + Sync + Send + 'static,
         M: InboundMessage,
     >(
         &mut self,
-        outbound: &mut OutboundChannel<T>,
-        inbound: &mut InboundChannel<T, M>,
+        outbound: &mut OutboundChannel<T0>,
+        inbound: &mut InboundChannel<T1, M>,
     ) {
         let output_parent_idx = self
             .unique_idx_name_pairs
diff --git a/src/core.rs b/src/core.rs
index 2047f92..f4d50dc 100644
--- a/src/core.rs
+++ b/src/core.rs
@@ -2,8 +2,8 @@
 
 /// Actor
 pub mod actor;
-pub use actor::{Actor, FromPropState};
 pub(crate) use actor::ActorNode;
+pub use actor::{Actor, FromPropState};
 
 /// Actor builder
 pub mod actor_builder;
@@ -17,13 +17,14 @@ pub use inbound::{
     OnMessage,
 };
 
-/// Outbound 
+/// Outbound
 pub mod outbound;
 pub(crate) use outbound::OutboundConnection;
-pub use outbound::{Morph, NullOutbound, OutboundChannel, OutboundHub};
+pub use outbound::{Activate, NullOutbound, OutboundChannel, OutboundHub};
 
 /// Request
 pub mod request;
+pub use request::{NullRequest, RequestHub};
 
 /// Connection
 pub mod connection;
diff --git a/src/core/actor.rs b/src/core/actor.rs
index f67f8ba..8aef207 100644
--- a/src/core/actor.rs
+++ b/src/core/actor.rs
@@ -80,7 +80,7 @@ pub trait FromPropState<
 
         let mut builder = ActorBuilder::new(context, &actor_name, prop, initial_state);
 
-        let request = Request::from_context_and_parent(&actor_name, &builder.sender);
+        let request = Request::from_parent_and_sender(&actor_name, &builder.sender);
 
         let inbound = Inbound::from_builder(&mut builder, &actor_name);
         builder.build::<Inbound, Run>(inbound, out, request)
diff --git a/src/core/actor_builder.rs b/src/core/actor_builder.rs
index 7c6c995..c7ffe82 100644
--- a/src/core/actor_builder.rs
+++ b/src/core/actor_builder.rs
@@ -1,5 +1,3 @@
-
-
 use crate::compute::context::Context;
 use crate::core::{
     actor::GenericActor,
@@ -14,14 +12,7 @@ use super::request::RequestHub;
 /// Creates actor from its components.
 ///
 /// Used in  [`InboundHub::from_builder`] public interface.
-pub struct ActorBuilder<
-    'a,
-    Prop,
-    State,
-    OutboundHub,
-    Request: RequestHub<M>,
-    M: InboundMessage,
-> {
+pub struct ActorBuilder<'a, Prop, State, OutboundHub, Request: RequestHub<M>, M: InboundMessage> {
     /// unique identifier of the actor
     pub actor_name: String,
     prop: Prop,
diff --git a/src/core/connection/outbound_connection.rs b/src/core/connection/outbound_connection.rs
index 533a800..9c9a4f6 100644
--- a/src/core/connection/outbound_connection.rs
+++ b/src/core/connection/outbound_connection.rs
@@ -1,8 +1,8 @@
 use std::sync::Arc;
 
-use crate::core::{outbound::GenericConnection, Morph};
+use crate::core::{outbound::GenericConnection, Activate};
 
-use super::{ConnectionRegister, ConnectionEnum};
+use super::{ConnectionEnum, ConnectionRegister};
 
 pub(crate) struct ConnectionConfig<T> {
     pub connection_register: ConnectionRegister<T>,
@@ -37,7 +37,6 @@ pub(crate) struct ActiveConnection<T> {
     pub maybe_register_landing_pad: Option<tokio::sync::oneshot::Receiver<ConnectionRegister<T>>>,
 }
 
-
 impl<T: Clone + Send + Sync + std::fmt::Debug + 'static> ConnectionEnum<T> {
     pub fn new() -> Self {
         Self::Config(ConnectionConfig::new())
@@ -68,7 +67,7 @@ impl<T: Clone + Send + Sync + std::fmt::Debug + 'static> ConnectionEnum<T> {
     }
 }
 
-impl<T> Morph for ConnectionEnum<T> {
+impl<T> Activate for ConnectionEnum<T> {
     fn extract(&mut self) -> Self {
         match self {
             Self::Config(config) => Self::Active(ActiveConnection {
diff --git a/src/core/connection/request_connection.rs b/src/core/connection/request_connection.rs
index 5b4d8da..fc640a4 100644
--- a/src/core/connection/request_connection.rs
+++ b/src/core/connection/request_connection.rs
@@ -2,7 +2,7 @@ use std::{marker::PhantomData, sync::Arc};
 
 use tokio::sync::mpsc::error::SendError;
 
-use crate::core::{InboundMessage, InboundMessageNew, Morph};
+use crate::core::{InboundMessage, InboundMessageNew, Activate};
 
 use super::{RequestConnectionEnum, RequestConnectionRegister};
 
@@ -103,7 +103,7 @@ impl<T: Send + Sync + std::fmt::Debug + 'static> RequestConnectionEnum<T> {
     }
 }
 
-impl<T> Morph for RequestConnectionEnum<T> {
+impl<T> Activate for RequestConnectionEnum<T> {
     fn extract(&mut self) -> Self {
         match self {
             Self::Config(config) => Self::Active(ActiveRequestConnection {
diff --git a/src/core/outbound.rs b/src/core/outbound.rs
index 8f26d3f..2a279ab 100644
--- a/src/core/outbound.rs
+++ b/src/core/outbound.rs
@@ -1,13 +1,13 @@
 use std::{marker::PhantomData, sync::Arc};
 use tokio::sync::mpsc::error::SendError;
 
+use super::connection::ConnectionEnum;
 use crate::compute::context::Context;
 use crate::core::inbound::{InboundChannel, InboundMessage, InboundMessageNew};
-
-use super::connection::ConnectionEnum;
+use std::fmt::{Debug, Formatter};
 
 /// OutboundHub is a collection of outbound channels for the actor.
-pub trait OutboundHub: Send + Sync + 'static + Morph {
+pub trait OutboundHub: Send + Sync + 'static + Activate {
     /// Creates the OutboundHub from context and the actor name.
     fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self;
 }
@@ -16,7 +16,7 @@ pub trait OutboundHub: Send + Sync + 'static + Morph {
 #[derive(Debug, Clone)]
 pub struct NullOutbound {}
 
-impl Morph for NullOutbound {
+impl Activate for NullOutbound {
     fn extract(&mut self) -> Self {
         Self {}
     }
@@ -39,7 +39,7 @@ pub struct OutboundChannel<T> {
     pub(crate) connection_register: ConnectionEnum<T>,
 }
 
-impl<T: Default + Clone + Send + Sync + std::fmt::Debug + 'static> OutboundChannel<T> {
+impl<OutT: Default + Clone + Send + Sync + std::fmt::Debug + 'static> OutboundChannel<OutT> {
     /// Create a new outbound for actor in provided context.    
     pub fn new(context: &mut Context, name: String, actor_name: &str) -> Self {
         context.assert_unique_outbound_name(name.clone(), actor_name);
@@ -52,35 +52,56 @@ impl<T: Default + Clone + Send + Sync + std::fmt::Debug + 'static> OutboundChann
     }
 
     /// Connect the outbound channel from this actor to the inbound channel of another actor.
-    pub fn connect<M: InboundMessageNew<T>>(
+    pub fn connect<M: InboundMessageNew<OutT>>(
         &mut self,
         ctx: &mut Context,
-        inbound: &mut InboundChannel<T, M>,
+        inbound: &mut InboundChannel<OutT, M>,
     ) {
         ctx.connect_impl(self, inbound);
-        self.connection_register.push(Arc::new(OutboundConnection {
-            sender: inbound.sender.clone(),
-            inbound_channel: inbound.name.clone(),
-            phantom: PhantomData {},
-        }));
+        self.connection_register
+            .push(Arc::new(OutboundConnection::<OutT, M> {
+                sender: inbound.sender.clone(),
+                inbound_channel: inbound.name.clone(),
+                phantom: PhantomData,
+            }));
+    }
+
+    /// Connect the outbound channel of type OutT to the inbound channel of another type InT.
+    /// The user provided adapter function is used to convert from OutT to InT.
+    pub fn connect_with_adapter<
+        InT: Default + Clone + Send + Sync + std::fmt::Debug + 'static,
+        M: InboundMessageNew<InT>,
+    >(
+        &mut self,
+        ctx: &mut Context,
+        adapter: fn(OutT) -> InT,
+        inbound: &mut InboundChannel<InT, M>,
+    ) {
+        ctx.connect_impl(self, inbound);
+        self.connection_register
+            .push(Arc::new(OutboundConnectionWithAdapter::<OutT, InT, M> {
+                sender: inbound.sender.clone(),
+                inbound_channel: inbound.name.clone(),
+                adapter,
+            }));
     }
 
     /// Send a message to the connected inbound channels to other actors.
-    pub fn send(&self, msg: T) {
+    pub fn send(&self, msg: OutT) {
         self.connection_register.send(msg);
     }
 }
 
-/// Trait for morphing state of an outbound channel.
-pub trait Morph {
-    /// Extract outbound channel and returns it.
+/// Outbound/request channel activation
+pub trait Activate {
+    /// Extract outbound/request channel and returns it.
     fn extract(&mut self) -> Self;
 
-    /// Activates the outbound channel to be used.
+    /// Activates the outbound/request channel to be used.
     fn activate(&mut self);
 }
 
-impl<T> Morph for OutboundChannel<T> {
+impl<T> Activate for OutboundChannel<T> {
     fn activate(&mut self) {
         self.connection_register.activate();
     }
@@ -94,19 +115,36 @@ impl<T> Morph for OutboundChannel<T> {
     }
 }
 
-#[derive(Debug, Clone)]
-pub(crate) struct OutboundConnection<T, M: InboundMessage> {
+#[derive(Clone, Debug)]
+pub(crate) struct OutboundConnection<Out, M: InboundMessage> {
     pub(crate) sender: tokio::sync::mpsc::Sender<M>,
     pub(crate) inbound_channel: String,
-    pub(crate) phantom: PhantomData<T>,
+    pub(crate) phantom: std::marker::PhantomData<Out>,
+}
+
+#[derive(Clone)]
+pub(crate) struct OutboundConnectionWithAdapter<Out, InT, M: InboundMessage> {
+    pub(crate) sender: tokio::sync::mpsc::Sender<M>,
+    pub(crate) inbound_channel: String,
+    pub(crate) adapter: fn(Out) -> InT,
+}
+
+impl<Out, InT, M: InboundMessage> Debug for OutboundConnectionWithAdapter<Out, InT, M> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("OutboundConnection")
+            .field("inbound_channel", &self.inbound_channel)
+            .finish()
+    }
 }
 
 pub(crate) trait GenericConnection<T>: Send + Sync {
     fn send_impl(&self, msg: T);
 }
 
-impl<T: Send + Sync, M: InboundMessageNew<T>> GenericConnection<T> for OutboundConnection<T, M> {
-    fn send_impl(&self, msg: T) {
+impl<Out: Send + Sync, M: InboundMessageNew<Out>> GenericConnection<Out>
+    for OutboundConnection<Out, M>
+{
+    fn send_impl(&self, msg: Out) {
         let msg = M::new(self.inbound_channel.clone(), msg);
         let c = self.sender.clone();
         let handler = tokio::spawn(async move {
@@ -120,3 +158,21 @@ impl<T: Send + Sync, M: InboundMessageNew<T>> GenericConnection<T> for OutboundC
         std::mem::drop(handler);
     }
 }
+
+impl<Out: Send + Sync, InT, M: InboundMessageNew<InT>> GenericConnection<Out>
+    for OutboundConnectionWithAdapter<Out, InT, M>
+{
+    fn send_impl(&self, msg: Out) {
+        let msg = M::new(self.inbound_channel.clone(), (self.adapter)(msg));
+        let c = self.sender.clone();
+        let handler = tokio::spawn(async move {
+            match c.send(msg).await {
+                Ok(_) => {}
+                Err(SendError(_)) => {
+                    println!("SendError");
+                }
+            }
+        });
+        std::mem::drop(handler);
+    }
+}
diff --git a/src/core/request.rs b/src/core/request.rs
index 4e27fa1..44c074f 100644
--- a/src/core/request.rs
+++ b/src/core/request.rs
@@ -5,12 +5,12 @@ use crate::compute::Context;
 
 use super::connection::request_connection::RequestConnection;
 use super::connection::RequestConnectionEnum;
-use super::{InboundChannel, InboundMessage, InboundMessageNew, Morph};
+use super::{InboundChannel, InboundMessage, InboundMessageNew, Activate};
 
-/// A request hub is used to send requests to other actors.
-pub trait RequestHub<M: InboundMessage>: Send + Sync + 'static + Morph {
+/// A request hub is used to send requests to other actors which will reply later.
+pub trait RequestHub<M: InboundMessage>: Send + Sync + 'static + Activate {
     /// Create a new request hub for an actor.
-    fn from_context_and_parent(actor_name: &str, sender: &tokio::sync::mpsc::Sender<M>) -> Self;
+    fn from_parent_and_sender(actor_name: &str, sender: &tokio::sync::mpsc::Sender<M>) -> Self;
 }
 
 /// A request message with a reply channel.
@@ -70,18 +70,18 @@ pub struct ReplyMessage<Reply> {
     pub reply: Reply,
 }
 
-/// OutboundChannel is a connections for messages which are sent to a downstream actor.
+/// RequestChannel is a connections for messages which are sent to a downstream actor.
 pub struct RequestChannel<Request, Reply, M: InboundMessage> {
-    /// Unique name of the outbound.
+    /// Unique name of the request channel.
     pub name: String,
-    /// Name of the actor that sends the outbound messages.
+    /// Name of the actor that sends the request messages.
     pub actor_name: String,
-    pub(crate) connection_register: RequestConnectionEnum<RequestMessage<Request, Reply>>,
 
+    pub(crate) connection_register: RequestConnectionEnum<RequestMessage<Request, Reply>>,
     pub(crate) sender: tokio::sync::mpsc::Sender<M>,
 }
 
-impl<Request, Reply, M: InboundMessage> Morph for RequestChannel<Request, Reply, M> {
+impl<Request, Reply, M: InboundMessage> Activate for RequestChannel<Request, Reply, M> {
     fn extract(&mut self) -> Self {
         Self {
             name: self.name.clone(),
@@ -102,7 +102,7 @@ impl<
         M: InboundMessageNew<ReplyMessage<Reply>>,
     > RequestChannel<Request, Reply, M>
 {
-    /// Create a new outbound for actor in provided context.    
+    /// Create a new request channel for actor in provided context.    
     pub fn new(name: String, actor_name: &str, sender: &tokio::sync::mpsc::Sender<M>) -> Self {
         Self {
             name: name.clone(),
@@ -112,7 +112,7 @@ impl<
         }
     }
 
-    /// Connect the outbound channel from this actor to the inbound channel of another actor.
+    /// Connect the request channel from this actor to the inbound channel of another actor.
     pub fn connect<Me: InboundMessageNew<RequestMessage<Request, Reply>>>(
         &mut self,
         _ctx: &mut Context,
@@ -125,7 +125,7 @@ impl<
         }));
     }
 
-    /// Send a message to the connected inbound channels to other actors.
+    /// Send a message to the connected inbound channels of other actors.
     pub fn send_request(&self, msg: Request) {
         let (sender, receiver) = tokio::sync::oneshot::channel();
         let msg = RequestMessage {
@@ -149,12 +149,12 @@ impl<
 pub struct NullRequest {}
 
 impl<M: InboundMessage> RequestHub<M> for NullRequest {
-    fn from_context_and_parent(_actor_name: &str, _sender: &tokio::sync::mpsc::Sender<M>) -> Self {
+    fn from_parent_and_sender(_actor_name: &str, _sender: &tokio::sync::mpsc::Sender<M>) -> Self {
         Self {}
     }
 }
 
-impl Morph for NullRequest {
+impl Activate for NullRequest {
     fn extract(&mut self) -> Self {
         Self {}
     }
diff --git a/src/core/runner.rs b/src/core/runner.rs
index f753491..357c2de 100644
--- a/src/core/runner.rs
+++ b/src/core/runner.rs
@@ -1,5 +1,3 @@
-
-
 use crate::core::{
     inbound::{InboundHub, InboundMessage},
     outbound::OutboundHub,
diff --git a/src/core/value.rs b/src/core/value.rs
index 2ef32ff..29c8218 100644
--- a/src/core/value.rs
+++ b/src/core/value.rs
@@ -2,8 +2,6 @@
 #[derive(Clone, Debug, Default)]
 pub struct NullState {}
 
-
 /// Empty prop - for actors without props.
 #[derive(Clone, Debug, Default)]
 pub struct NullProp {}
-
diff --git a/src/example_actors.rs b/src/example_actors.rs
index de0975d..a34ea83 100644
--- a/src/example_actors.rs
+++ b/src/example_actors.rs
@@ -4,7 +4,7 @@
 pub mod moving_average;
 
 /// One dimensional robot Kalman filter example.
-/// 
+///
 /// ```text
 /// *   Periodic_0   *
 /// |   time_stamp   |
@@ -27,5 +27,3 @@ pub mod moving_average;
 /// *Printer(filter s*                  *  DrawActor_0   *
 /// ```
 pub mod one_dim_robot;
-
-
diff --git a/src/example_actors/moving_average.rs b/src/example_actors/moving_average.rs
index 7da0c3e..ce90990 100644
--- a/src/example_actors/moving_average.rs
+++ b/src/example_actors/moving_average.rs
@@ -3,7 +3,7 @@ use crate::macros::*;
 // needed for actor_outputs macro
 pub use crate::compute::Context;
 use crate::core::request::NullRequest;
-pub use crate::core::{Morph, OutboundChannel, OutboundHub};
+pub use crate::core::{Activate, OutboundChannel, OutboundHub};
 
 // needed for actor_inputs macro
 pub use crate::core::{
@@ -11,7 +11,7 @@ pub use crate::core::{
 };
 
 // needed for actor macro
-pub use crate::core::{Actor, FromPropState, DefaultRunner};
+pub use crate::core::{Actor, DefaultRunner, FromPropState};
 
 /// Outbound hub for the MovingAverage.
 #[actor_outputs]
diff --git a/src/example_actors/one_dim_robot.rs b/src/example_actors/one_dim_robot.rs
index f52e873..4696920 100644
--- a/src/example_actors/one_dim_robot.rs
+++ b/src/example_actors/one_dim_robot.rs
@@ -11,9 +11,9 @@ pub mod filter;
 pub use filter::{Filter, NamedFilterState};
 
 /// Drawing actor for the one dimensional robot.
-/// 
+///
 /// Draws "ascii art" of the robot and the filter state to the console.
-/// 
+///
 /// ```text
 /// time:2.25
 ///                         ⡏⢹⢀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⡀
diff --git a/src/example_actors/one_dim_robot/draw.rs b/src/example_actors/one_dim_robot/draw.rs
index 98282dc..c85e17c 100644
--- a/src/example_actors/one_dim_robot/draw.rs
+++ b/src/example_actors/one_dim_robot/draw.rs
@@ -1,3 +1,4 @@
+use crate::actors::zip::Tuple3;
 use crate::core::request::NullRequest;
 use crate::core::InboundMessageNew;
 use crate::core::NullOutbound;
@@ -11,12 +12,8 @@ use drawille::Canvas;
 #[derive(Clone, Debug)]
 #[actor_inputs(DrawInbound, {NullProp, DrawState, NullOutbound,NullRequest})]
 pub enum DrawInboundMessage {
-    /// True position of the robot.
-    TruePos(Stamped<Robot>),
-    /// True range measurement.
-    TrueRange(Stamped<f64>),
-    /// Filter estimate of the robot's position.
-    FilterEst(NamedFilterState),
+    /// Tuple of true pos, true range and filter state
+    Zipped(Tuple3<u64, Stamped<Robot>, Stamped<f64>, NamedFilterState>),
 }
 
 /// Draw actor for one-dim-robot example.
@@ -29,45 +26,26 @@ impl OnMessage for DrawInboundMessage {
         self,
         _prop: &NullProp,
         state: &mut Self::State,
-        outbound: &Self::OutboundHub,
+        _outbound: &Self::OutboundHub,
         _request: &Self::RequestHub,
     ) {
         match self {
-            DrawInboundMessage::TruePos(msg) => {
-                state.true_pos(msg.clone(), outbound);
+            DrawInboundMessage::Zipped(msg) => {
+                state.draw(msg.item0.clone(), msg.item1.clone(), msg.item2.clone());
             }
-            DrawInboundMessage::TrueRange(msg) => {
-                state.true_range(msg.clone(), outbound);
-            }
-            DrawInboundMessage::FilterEst(msg) => {
-                state.filter_est(msg.clone(), outbound);
-            }
-        }
-    }
-}
-
-impl InboundMessageNew<Stamped<Robot>> for DrawInboundMessage {
-    fn new(inbound_channel: String, msg: Stamped<Robot>) -> Self {
-        match inbound_channel.as_str() {
-            "TruePos" => DrawInboundMessage::TruePos(msg),
-            _ => panic!("Unknown inbound name {}", inbound_channel),
         }
     }
 }
 
-impl InboundMessageNew<Stamped<f64>> for DrawInboundMessage {
-    fn new(inbound_channel: String, msg: Stamped<f64>) -> Self {
+impl InboundMessageNew<Tuple3<u64, Stamped<Robot>, Stamped<f64>, NamedFilterState>>
+    for DrawInboundMessage
+{
+    fn new(
+        inbound_channel: String,
+        msg: Tuple3<u64, Stamped<Robot>, Stamped<f64>, NamedFilterState>,
+    ) -> Self {
         match inbound_channel.as_str() {
-            "TrueRange" => DrawInboundMessage::TrueRange(msg),
-            _ => panic!("Unknown inbound name {}", inbound_channel),
-        }
-    }
-}
-
-impl InboundMessageNew<NamedFilterState> for DrawInboundMessage {
-    fn new(inbound_channel: String, msg: NamedFilterState) -> Self {
-        match inbound_channel.as_str() {
-            "FilterEst" => DrawInboundMessage::FilterEst(msg),
+            "Zipped" => DrawInboundMessage::Zipped(msg),
             _ => panic!("Unknown inbound name {}", inbound_channel),
         }
     }
@@ -75,39 +53,17 @@ impl InboundMessageNew<NamedFilterState> for DrawInboundMessage {
 
 /// State of the draw actor.
 #[derive(Clone, Debug, Default)]
-pub struct DrawState {
-    /// The most recent true position.
-    pub true_robot: Option<Stamped<Robot>>,
-    /// The most recent true range measurement.
-    pub true_range: Option<Stamped<f64>>,
-    /// The most recent filter estimate.
-    pub filter_est: Option<NamedFilterState>,
-}
+pub struct DrawState {}
 
 impl DrawState {
-    /// Visualize the true robot state by saving it to the state and invoking [DrawState::draw].
-    pub fn true_pos(&mut self, p: Stamped<Robot>, _: &NullOutbound) {
-        self.true_robot = Some(p);
-
-        self.draw();
-    }
-
-    /// Visualize the range measurement by saving it to the state and invoking [DrawState::draw].
-    pub fn true_range(&mut self, p: Stamped<f64>, _: &NullOutbound) {
-        self.true_range = Some(p);
-
-        self.draw();
-    }
-
-    /// Visualize the filter estimate by saving it to the state and invoking [DrawState::draw].
-    pub fn filter_est(&mut self, p: NamedFilterState, _: &NullOutbound) {
-        self.filter_est = Some(p);
-        self.draw();
-    }
-
     /// Draw the current state to the console if all information of the most recent timestamp is
     /// available.
-    pub fn draw(&mut self) {
+    pub fn draw(
+        &mut self,
+        true_robot: Stamped<Robot>,
+        true_range: Stamped<f64>,
+        filter_est: NamedFilterState,
+    ) {
         let factor = 6.0;
 
         let width_pixels: u32 = 100;
@@ -124,71 +80,65 @@ impl DrawState {
 
         let mut canvas = Canvas::new(width_pixels, height_pixels);
 
-        if let Some(p) = &self.true_robot {
-            let true_x = p.value.position;
-            if let Some(r) = &self.true_range {
-                if let Some(est) = &self.filter_est {
-                    if r.time == p.time && r.time == est.state.time {
-                        let x_left_ground = pixels_from_meter(true_x - 0.25, 0.0);
-                        let x_right_ground = pixels_from_meter(true_x + 0.25, 0.0);
-                        let x_left_up = pixels_from_meter(true_x - 0.25, 1.5);
-                        let x_right_up = pixels_from_meter(true_x + 0.25, 1.5);
-                        canvas.line_colored(
-                            x_left_ground.0,
-                            x_left_ground.1,
-                            x_left_up.0,
-                            x_left_up.1,
-                            drawille::PixelColor::Blue,
-                        );
-                        canvas.line_colored(
-                            x_left_up.0,
-                            x_left_up.1,
-                            x_right_up.0,
-                            x_right_up.1,
-                            drawille::PixelColor::Blue,
-                        );
-                        canvas.line_colored(
-                            x_right_ground.0,
-                            x_right_ground.1,
-                            x_right_up.0,
-                            x_right_up.1,
-                            drawille::PixelColor::Blue,
-                        );
-
-                        let range_start = pixels_from_meter(p.value.position + 0.5, 1.0);
-                        let range_endpoint = pixels_from_meter(p.value.position + r.value, 1.0);
-                        canvas.line_colored(
-                            range_start.0,
-                            range_start.1,
-                            range_endpoint.0,
-                            range_endpoint.1,
-                            drawille::PixelColor::Red,
-                        );
-
-                        let std = est.state.robot_position.covariance.sqrt();
-
-                        let believe_min = pixels_from_meter(p.value.position - 3.0 * std, 1.0);
-                        let believe_max = pixels_from_meter(p.value.position + 3.0 * std, 1.0);
-
-                        canvas.line_colored(
-                            believe_min.0,
-                            5,
-                            believe_min.0,
-                            10,
-                            drawille::PixelColor::Green,
-                        );
-                        canvas.line_colored(
-                            believe_max.0,
-                            5,
-                            believe_max.0,
-                            10,
-                            drawille::PixelColor::Green,
-                        );
-
-                        println!("time:{}\n{}", r.time, canvas.frame());
-                    }
-                }
-            }
-        }
+        let true_x = true_robot.value.position;
+        let x_left_ground = pixels_from_meter(true_x - 0.25, 0.0);
+        let x_right_ground = pixels_from_meter(true_x + 0.25, 0.0);
+        let x_left_up = pixels_from_meter(true_x - 0.25, 1.5);
+        let x_right_up = pixels_from_meter(true_x + 0.25, 1.5);
+        canvas.line_colored(
+            x_left_ground.0,
+            x_left_ground.1,
+            x_left_up.0,
+            x_left_up.1,
+            drawille::PixelColor::Blue,
+        );
+        canvas.line_colored(
+            x_left_up.0,
+            x_left_up.1,
+            x_right_up.0,
+            x_right_up.1,
+            drawille::PixelColor::Blue,
+        );
+        canvas.line_colored(
+            x_right_ground.0,
+            x_right_ground.1,
+            x_right_up.0,
+            x_right_up.1,
+            drawille::PixelColor::Blue,
+        );
+
+        let p = filter_est.state.clone();
+        let r = true_range;
+        let range_start = pixels_from_meter(p.pos_vel_acc.mean.x + 0.5, 1.0);
+        let range_endpoint = pixels_from_meter(p.pos_vel_acc.mean.x + r.value, 1.0);
+        canvas.line_colored(
+            range_start.0,
+            range_start.1,
+            range_endpoint.0,
+            range_endpoint.1,
+            drawille::PixelColor::Red,
+        );
+
+        let std = filter_est.state.pos_vel_acc.covariance[(0, 0)].sqrt();
+
+        let believe_min = pixels_from_meter(p.pos_vel_acc.mean.x - 3.0 * std, 1.0);
+        let believe_max = pixels_from_meter(p.pos_vel_acc.mean.x + 3.0 * std, 1.0);
+
+        canvas.line_colored(
+            believe_min.0,
+            5,
+            believe_min.0,
+            10,
+            drawille::PixelColor::Green,
+        );
+        canvas.line_colored(
+            believe_max.0,
+            5,
+            believe_max.0,
+            10,
+            drawille::PixelColor::Green,
+        );
+
+        println!("time:{}\n{}", r.time, canvas.frame());
     }
 }
diff --git a/src/example_actors/one_dim_robot/filter.rs b/src/example_actors/one_dim_robot/filter.rs
index 1e75c0a..bb10f31 100644
--- a/src/example_actors/one_dim_robot/filter.rs
+++ b/src/example_actors/one_dim_robot/filter.rs
@@ -3,8 +3,8 @@ use std::fmt::{Debug, Display};
 use crate::compute::Context;
 use crate::core::request::{NullRequest, RequestMessage};
 use crate::core::{
-    Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage,
-    InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub,
+    Activate, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub,
+    InboundMessage, InboundMessageNew, NullProp, OnMessage, OutboundChannel, OutboundHub,
 };
 use crate::example_actors::one_dim_robot::{RangeMeasurementModel, Stamped};
 use hollywood_macros::{actor, actor_inputs, actor_outputs};
@@ -13,7 +13,7 @@ use super::sim::PingPong;
 
 /// Inbound channels for the filter actor.
 #[derive(Clone, Debug)]
-#[actor_inputs(FilterInbound,{NullProp, FilterState,FilterOutbound,NullRequest})]
+#[actor_inputs(FilterInbound,{NullProp, FilterState, FilterOutbound, NullRequest})]
 pub enum FilterInboundMessage {
     /// noisy velocity measurements
     NoisyVelocity(Stamped<f64>),
@@ -76,8 +76,10 @@ impl InboundMessageNew<RequestMessage<f64, PingPong>> for FilterInboundMessage {
 pub struct FilterState {
     /// time of the last prediction or update
     pub time: f64,
+    /// Monotonically increasing sequence number
+    pub seq: u64,
     /// belief about the robot's position
-    pub robot_position: PositionBelieve,
+    pub pos_vel_acc: PositionBelieve,
 }
 
 impl Display for FilterState {
@@ -85,7 +87,7 @@ impl Display for FilterState {
         write!(
             f,
             "(time: {}, robot_position: {})",
-            self.time, self.robot_position.mean
+            self.time, self.pos_vel_acc.mean
         )
     }
 }
@@ -116,16 +118,20 @@ impl Display for NamedFilterState {
 #[derive(Clone, Debug)]
 pub struct PositionBelieve {
     /// mean of the position believe
-    pub mean: f64,
+    pub mean: nalgebra::Vector3<f64>,
     /// covariance of the position believe
-    pub covariance: f64,
+    pub covariance: nalgebra::Matrix3<f64>,
 }
 
 impl Default for PositionBelieve {
     fn default() -> Self {
         Self {
-            mean: 0.0,
-            covariance: 100.0,
+            mean: nalgebra::Vector3::new(0.0, 0.0, 0.0),
+            covariance: nalgebra::Matrix3::new(
+                100.0, 0.0, 0.0, //
+                0.0, 100.0, 0.0, //
+                0.0, 0.0, 100.0,
+            ),
         }
     }
 }
@@ -140,9 +146,43 @@ impl FilterState {
         let dt = noisy_velocity.time - self.time;
         self.time = noisy_velocity.time;
 
-        self.robot_position.mean += noisy_velocity.value * dt;
-        const VELOCITY_STD_DEV: f64 = 0.1;
-        self.robot_position.covariance += VELOCITY_STD_DEV * VELOCITY_STD_DEV * dt;
+        // 1. Random-walk acceleration motion model
+        self.pos_vel_acc.mean[0] +=
+            self.pos_vel_acc.mean[1] * dt + 0.5 * self.pos_vel_acc.mean[2] * dt * dt;
+        self.pos_vel_acc.mean[1] += self.pos_vel_acc.mean[2] * dt;
+        let f = nalgebra::Matrix3::new(1.0, dt, 0.5 * dt * dt, 0.0, 1.0, dt, 0.0, 0.0, 1.0);
+        let acceleration_noise_variance = 0.5;
+        let q = nalgebra::Matrix3::new(
+            0.25 * dt.powi(4),
+            0.5 * dt.powi(3),
+            0.5 * dt.powi(2) * acceleration_noise_variance,
+            0.5 * dt.powi(3),
+            dt.powi(2),
+            dt * acceleration_noise_variance,
+            0.5 * dt.powi(2) * acceleration_noise_variance,
+            dt * acceleration_noise_variance,
+            acceleration_noise_variance,
+        );
+        self.pos_vel_acc.covariance = f * self.pos_vel_acc.covariance * f.transpose() + q;
+
+        // 2. Update velocity based on the velocity measurement
+        // (strictly speaking this is an update, not a prediction)
+        let h_velocity = nalgebra::Matrix1x3::new(0.0, 1.0, 0.0);
+        let predicted_velocity = self.pos_vel_acc.mean[1];
+        let innovation_velocity = noisy_velocity.value - predicted_velocity;
+        const VELOCITY_MEASUREMENT_NOISE: f64 = 0.1;
+        let r_velocity =
+            nalgebra::Matrix1::new(VELOCITY_MEASUREMENT_NOISE * VELOCITY_MEASUREMENT_NOISE);
+        let kalman_gain_velocity = self.pos_vel_acc.covariance
+            * h_velocity.transpose()
+            * (h_velocity * self.pos_vel_acc.covariance * h_velocity.transpose() + r_velocity)
+                .try_inverse()
+                .unwrap();
+        self.pos_vel_acc.mean += kalman_gain_velocity * innovation_velocity;
+        let identity = nalgebra::Matrix3::identity();
+        self.pos_vel_acc.covariance =
+            (identity - kalman_gain_velocity * h_velocity) * self.pos_vel_acc.covariance;
+
         outbound.predicted_state.send(NamedFilterState::new(
             "Predicted: ".to_owned(),
             self.clone(),
@@ -153,17 +193,21 @@ impl FilterState {
     ///
     /// Updates the robot's position based on the range measurement.
     pub fn update(&mut self, noisy_range: &Stamped<f64>, outbound: &FilterOutbound) {
-        let predicted_range = Self::RANGE_MODEL.range(self.robot_position.mean);
-        const RANGE_STD_DEV: f64 = 1.5 * RangeMeasurementModel::RANGE_STD_DEV;
-
-        let innovation = noisy_range.value - predicted_range;
-
-        let mat_h = Self::RANGE_MODEL.dx_range();
-        let innovation_covariance = self.robot_position.covariance + RANGE_STD_DEV * RANGE_STD_DEV;
-        let kalman_gain = mat_h * self.robot_position.covariance / innovation_covariance;
-        self.robot_position.mean += kalman_gain * innovation;
-        self.robot_position.covariance *= 1.0 - kalman_gain * mat_h;
-
+        // Update position based on the range measurement
+        let h = nalgebra::Matrix1x3::new(1.0, 0.0, 0.0);
+        let predicted_range = Self::RANGE_MODEL.range(self.pos_vel_acc.mean[0]);
+        let innovation = predicted_range - noisy_range.value;
+        const RANGE_STD_DEV: f64 = RangeMeasurementModel::RANGE_STD_DEV;
+        let r = nalgebra::Matrix1::new(RANGE_STD_DEV * RANGE_STD_DEV);
+        let kalman_gain = self.pos_vel_acc.covariance
+            * h.transpose()
+            * (h * self.pos_vel_acc.covariance * h.transpose() + r)
+                .try_inverse()
+                .unwrap();
+        self.pos_vel_acc.mean += kalman_gain * innovation;
+        let identity = nalgebra::Matrix3::identity();
+        self.pos_vel_acc.covariance = (identity - kalman_gain * h) * self.pos_vel_acc.covariance;
+        self.seq += 1;
         outbound
             .updated_state
             .send(NamedFilterState::new("Updated: ".to_owned(), self.clone()));
diff --git a/src/example_actors/one_dim_robot/model.rs b/src/example_actors/one_dim_robot/model.rs
index d3a032a..9c89455 100644
--- a/src/example_actors/one_dim_robot/model.rs
+++ b/src/example_actors/one_dim_robot/model.rs
@@ -5,15 +5,18 @@ use std::fmt::{Debug, Display};
 pub struct Stamped<T: Clone + Debug> {
     /// Timestamp of the value.
     pub time: f64,
+    /// Monotonic sequence counter
+    pub seq: u64,
     /// The value.
     pub value: T,
 }
 
 impl<T: Clone + Debug> Stamped<T> {
     /// Creates a new value with a timestamp.
-    pub fn from_stamp_and_value(time: f64, value: &T) -> Self {
+    pub fn from_stamp_counter_and_value(time: f64, seq: u64, value: &T) -> Self {
         Self {
             time,
+            seq,
             value: value.clone(),
         }
     }
diff --git a/src/example_actors/one_dim_robot/sim.rs b/src/example_actors/one_dim_robot/sim.rs
index 9c7058a..70ab8c3 100644
--- a/src/example_actors/one_dim_robot/sim.rs
+++ b/src/example_actors/one_dim_robot/sim.rs
@@ -5,15 +5,18 @@ use rand_distr::{Distribution, Normal};
 use crate::compute::Context;
 use crate::core::request::{ReplyMessage, RequestChannel, RequestHub};
 use crate::core::{
-    Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage,
-    InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub,
+    Activate, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub,
+    InboundMessage, InboundMessageNew, NullProp, OnMessage, OutboundChannel, OutboundHub,
 };
 use crate::example_actors::one_dim_robot::{RangeMeasurementModel, Robot, Stamped};
 use crate::macros::*;
 
+/// Ping-pong request message.
 #[derive(Clone, Debug, Default)]
 pub struct PingPong {
+    /// time-stamp of the request message
     pub ping: f64,
+    /// time-stamp of the reply message
     pub pong: f64,
 }
 
@@ -73,6 +76,8 @@ pub struct SimState {
     pub shutdown_time: f64,
     /// Current time.
     pub time: f64,
+    /// Monotonic sequence counter
+    pub seq: u64,
     /// True position and velocity of the robot.
     pub true_robot: Robot,
 }
@@ -82,14 +87,16 @@ impl SimState {
 
     /// One step of the simulation.
     pub fn process_time_stamp(&mut self, time: f64, outbound: &SimOutbound, request: &SimRequest) {
+        let dt = time - self.time;
         self.time = time;
-        self.true_robot.position += self.true_robot.velocity * time;
-        self.true_robot.velocity = 0.25 * (0.25 * time).cos();
+        self.true_robot.position += self.true_robot.velocity * dt;
+        self.true_robot.velocity = 2.5 * (0.25 * time).cos();
 
         let true_range = Self::RANGE_MODEL.range(self.true_robot.position);
         const RANGE_STD_DEV: f64 = RangeMeasurementModel::RANGE_STD_DEV;
         let range_normal = Normal::new(0.0, RANGE_STD_DEV).unwrap();
-        let noisy_range = true_range + range_normal.sample(&mut rand::thread_rng());
+        let s = range_normal.sample(&mut rand::thread_rng());
+        let noisy_range = true_range + s;
 
         const VELOCITY_STD_DEV: f64 = 0.01;
         let noisy_velocity = self.true_robot.velocity
@@ -99,20 +106,41 @@ impl SimState {
 
         outbound
             .true_robot
-            .send(Stamped::from_stamp_and_value(time, &self.true_robot));
+            .send(Stamped::from_stamp_counter_and_value(
+                time,
+                self.seq,
+                &self.true_robot,
+            ));
         outbound
             .true_range
-            .send(Stamped::from_stamp_and_value(time, &true_range));
+            .send(Stamped::from_stamp_counter_and_value(
+                time,
+                self.seq,
+                &true_range,
+            ));
         outbound
             .noisy_range
-            .send(Stamped::from_stamp_and_value(time, &noisy_range));
-        outbound.true_velocity.send(Stamped::from_stamp_and_value(
-            time,
-            &self.true_robot.velocity,
-        ));
+            .send(Stamped::from_stamp_counter_and_value(
+                time,
+                self.seq,
+                &noisy_range,
+            ));
+        outbound
+            .true_velocity
+            .send(Stamped::from_stamp_counter_and_value(
+                time,
+                self.seq,
+                &self.true_robot.velocity,
+            ));
         outbound
             .noisy_velocity
-            .send(Stamped::from_stamp_and_value(time, &noisy_velocity));
+            .send(Stamped::from_stamp_counter_and_value(
+                time,
+                self.seq,
+                &noisy_velocity,
+            ));
+
+        self.seq += 1;
 
         if time == 5.0 {
             request.ping_pong.send_request(time);
@@ -138,30 +166,8 @@ pub struct SimOutbound {
 }
 
 /// Request of the simulation actor.
+#[actor_requests]
 pub struct SimRequest {
     /// Check time-stamp of receiver
     pub ping_pong: RequestChannel<f64, PingPong, SimInboundMessage>,
 }
-
-impl RequestHub<SimInboundMessage> for SimRequest {
-    fn from_context_and_parent(
-        actor_name: &str,
-        sender: &tokio::sync::mpsc::Sender<SimInboundMessage>,
-    ) -> Self {
-        Self {
-            ping_pong: RequestChannel::new(actor_name.to_owned(), "ping_pong", sender),
-        }
-    }
-}
-
-impl Morph for SimRequest {
-    fn extract(&mut self) -> Self {
-        Self {
-            ping_pong: self.ping_pong.extract(),
-        }
-    }
-
-    fn activate(&mut self) {
-        self.ping_pong.activate();
-    }
-}
diff --git a/src/introspect.rs b/src/introspect.rs
index eebf112..c66013d 100644
--- a/src/introspect.rs
+++ b/src/introspect.rs
@@ -1,3 +1,2 @@
 /// The flow graph.
 pub mod flow_graph;
-
diff --git a/src/introspect/flow_graph.rs b/src/introspect/flow_graph.rs
index 1b74bbd..9fceddc 100644
--- a/src/introspect/flow_graph.rs
+++ b/src/introspect/flow_graph.rs
@@ -297,8 +297,9 @@ impl FlowGraph {
 
                     let node_id = self
                         .topology
-                        .unique_idx_name_pairs.get_node_idx(&super_node.actor).unwrap();
-                     
+                        .unique_idx_name_pairs
+                        .get_node_idx(&super_node.actor)
+                        .unwrap();
 
                     for i in self
                         .topology
diff --git a/src/lib.rs b/src/lib.rs
index 66dd242..733cd80 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,5 @@
+#![deny(missing_docs)]
+
 //! # Hollywood
 //!
 //! Hollywood is an actor framework for Rust.
@@ -20,18 +22,19 @@
 //!
 //! The library is organized in the following modules:
 //!
-//! - The [macros] module contains the three macros that are used to define an actors with
-//!   minimal boilerplate.
 //! - The [core] module contains the core structs and traits of the library. [Actor](core::Actor)
 //!   is a generic struct that represents an actor. [InboundHub](core::InboundHub) is the trait
 //!   which represents the collection of inbound channels of an actor. Similarly,
 //!   [OutboundHub](core::OutboundHub) is the trait which represents the collection of outbound
 //!   channels of an actor.
-//!
+//! 
 //!   Most importantly, [OnMessage](core::OnMessage) is the main entry point for user code and sets
 //!   the behavior of a user-defines actor. [OnMessage::on_message()](core::OnMessage::on_message())
 //!   processes incoming messages, updates the actor's state and sends messages to downstream actors
 //!   in the pipeline.
+//! 
+//! - The [macros] module contains the three macros that are used to define an actors with
+//!   minimal boilerplate.
 //!
 //! - The [compute] module contains the [Context](compute::Context) and
 //!   [Pipeline](compute::Pipeline) which are used to configure a set of actors, connect
@@ -39,10 +42,10 @@
 //!
 //! - The [actors] module contains a set of predefined actors that can be used as part of a compute
 //!   pipelines.
-//! 
+//!
 //! - The [introspect] module contains a some visualization tools to inspect the compute pipeline.
 //!
-//! - The [examples] module contains a set of examples actors that demonstrate how to use the library.
+//! - The [example_actors] module contains a set of examples actors that demonstrate how to use the library.
 //!
 //! ## Example: moving average
 //!
@@ -98,7 +101,7 @@
 //! Note that MovingAverageState implements [Default] trait through the derive macro, and hence
 //! moving_average is initialized to 0.0 which is the default value for f64. An explicit
 //! implementation of the [Default] trait can be used to set the values of member fields as done for
-//! the [examples::moving_average::MovingAverageProp] struct here.
+//! the [example_actors::moving_average::MovingAverageProp] struct here.
 //!
 //! ### Inbound hub
 //!
@@ -303,8 +306,9 @@ pub mod example_actors;
 /// following order:
 ///
 /// 1. [actor_outputs](macros::actor_outputs)
-/// 2. [actor_inputs](macros::actor_inputs) which depends on 1.
-/// 3. [actor](macros::actor) which depends on 1. and 2.
+/// 2. [actor_requests](macros::actor_requests)
+/// 3. [actor_inputs](macros::actor_inputs) which depends on 1. and 2.
+/// 4. [actor](macros::actor) which depends on 1., 2. and 3.
 ///
 /// The documentation in this module is rather technical. For a more practical introduction, please
 /// refer to the examples in the root of the [crate](crate#example-moving-average).
@@ -330,13 +334,36 @@ pub mod macros {
     /// user-specified name CHANNEL* and a user specified type TYPE*.
     ///
     /// Effect: The macro generates the [OutboundHub](crate::core::OutboundHub) and
-    /// [Morph](crate::core::Morph) implementations for the provided struct OUTBOUND.
+    /// [Activate](crate::core::Activate) implementations for the provided struct OUTBOUND.
     ///
     /// This is the first of three macros to define an actor. The other two are [macro@actor_inputs]
     /// and [macro@actor].
     ///
     pub use hollywood_macros::actor_outputs;
 
+    /// This macro generates the boilerplate for the request hub struct it is applied to.
+    ///
+    /// Macro template:
+    ///
+    /// ``` text
+    /// #[actor_requests]
+    /// pub struct REQUEST {
+    ///     pub CHANNEL0: RequestChannel<REQ_TYPE0, REPL_TYPE0, M0>,
+    ///     pub CHANNEL1: RequestChannel<REQ_TYPE1, REPL_TYPE2, M1>,
+    ///     ...
+    /// }
+    /// ```
+    ///
+    /// Here, REQUEST is the user-specified name of the struct. The struct shall be defined right
+    /// after the macro invocation. The request struct consists of one or more request channels. 
+    /// Each request channel has name CHANNEL*, a request type REQ_TYPE*, a reply type REPL_TYPE*,
+    /// and a message type M*.
+    ///
+    /// Effect: The macro generates the [RequestHub](crate::core::RequestHub) and
+    /// [Activate](crate::core::Activate) implementations for the provided struct REQUEST.
+    ///
+    pub use hollywood_macros::actor_requests;
+
     /// This macro generates the boilerplate for the inbound hub of an actor.
     ///
     /// Macro template:
@@ -357,7 +384,9 @@ pub mod macros {
     ///
     /// Prerequisite:
     ///   - The OUTBOUND struct is defined and implements [OutboundHub](crate::core::OutboundHub)
-    ///     and [Morph](crate::core::Morph), typically using the [macro@actor_outputs] macro.
+    ///     and [Activate](crate::core::Activate), typically using the [macro@actor_outputs] macro.
+     ///  - The REQUEST struct is defined and implements [RequestHub](crate::core::RequestHub) and
+    ///     [Activate](crate::core::Activate), e.g. using the [actor_requests] macro.
     ///   - The PROP and STATE structs are defined.
     ///
     /// Effects:
@@ -366,8 +395,6 @@ pub mod macros {
     ///     [InboundHub](crate::core::InboundHub) trait for it.
     ///   - Implements the [InboundMessage](crate::core::InboundMessage) trait for INBOUND_MESSAGE.
     ///
-    /// This is the second of three macros to define an actor. The other two are
-    /// [macro@actor_outputs] and [macro@actor].
     pub use hollywood_macros::actor_inputs;
 
     /// This macro generates the boilerplate to define an new actor type.
@@ -376,7 +403,7 @@ pub mod macros {
     ///
     /// ``` text
     /// #[actor(INBOUND_MESSAGE)]
-    /// type ACTOR = Actor<PROP, INBOUND, STATE, OUTBOUND>;
+    /// type ACTOR = Actor<PROP, INBOUND, STATE, OUTBOUND, REQUEST>;
     /// ```
     ///
     /// Here, ACTOR is the user-specified name of the actor type. The actor type shall be defined
@@ -384,7 +411,9 @@ pub mod macros {
     ///
     /// Prerequisites:
     ///   - The OUTBOUND struct is defined and implements (OutboundHub)[crate::core::OutboundHub]
-    ///     and [Morph](crate::core::Morph), e.g. using the [actor_outputs] macro.
+    ///     and [Activate](crate::core::Activate), e.g. using the [actor_outputs] macro.
+    ///   - The REQUEST struct is defined and implements [RequestHub](crate::core::RequestHub) and
+    ///     [Activate](crate::core::Activate), e.g. using the [actor_requests] macro.
     ///   - The INBOUND_MESSAGE enum is defined and implements
     ///     [InboundMessage](crate::core::InboundMessage), as well as the INBOUND
     ///     struct is defined and implements the [InboundHub](crate::core::InboundHub) trait, e.g.
@@ -395,7 +424,6 @@ pub mod macros {
     ///   - This macro implements the [FromPropState](crate::core::FromPropState) trait for the ACTOR
     ///     type.
     ///
-    /// This is the last of the three macros that need to be used to define a new actor type. The
-    /// first one is [macro@actor_outputs], the second one is [macro@actor_inputs].
     pub use hollywood_macros::actor;
+
 }