From ce47fe42f09a27436a5208ea7eee522452a62835 Mon Sep 17 00:00:00 2001 From: Arsile Date: Thu, 25 Feb 2021 11:26:21 +0300 Subject: [PATCH 1/5] WIP [skip ci] --- cqrs/src/lib.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cqrs/src/lib.rs b/cqrs/src/lib.rs index bb3a95f..6b7c5fb 100644 --- a/cqrs/src/lib.rs +++ b/cqrs/src/lib.rs @@ -64,6 +64,7 @@ pub use self::{ }, lifecycle::BorrowableAsContext, }; +use std::iter::FromIterator; #[async_trait(?Send)] pub trait CommandGateway { @@ -71,6 +72,19 @@ pub trait CommandGateway { type Ok; async fn send(&self, cmd: Cmd, meta: Mt) -> Result; + + async fn send_many(&self, cmds: Vec, meta: Mt) -> Result, Self::Err> + where + Mt: Clone + 'async_trait, + Cmd: 'async_trait, + { + let mut res = Vec::new(); + + for cmd in cmds { + res.push(self.send(cmd, meta.clone()).await?) + } + Ok(res) + } } #[async_trait(?Send)] From 47c01ab538076ab0049ff353a5f9c629b7820a73 Mon Sep 17 00:00:00 2001 From: Arsile Date: Fri, 5 Mar 2021 12:02:26 +0300 Subject: [PATCH 2/5] WIP 1 [skip ci] --- cqrs/src/lib.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/cqrs/src/lib.rs b/cqrs/src/lib.rs index 6b7c5fb..990c35d 100644 --- a/cqrs/src/lib.rs +++ b/cqrs/src/lib.rs @@ -73,15 +73,24 @@ pub trait CommandGateway { async fn send(&self, cmd: Cmd, meta: Mt) -> Result; - async fn send_many(&self, cmds: Vec, meta: Mt) -> Result, Self::Err> + async fn send_many( + &self, + cmds: Vec, + meta: Mt + ) -> Result< + Vec<(Option<::Id>, Self::Ok)>, + Self::Err, + > where Mt: Clone + 'async_trait, Cmd: 'async_trait, + ::Id: Clone { let mut res = Vec::new(); for cmd in cmds { - res.push(self.send(cmd, meta.clone()).await?) + let id = cmd.aggregate_id().cloned(); + res.push((id, self.send(cmd, meta.clone()).await?)) } Ok(res) } From 9ef361e34de52182f20ad364b8a9493f11359a25 Mon Sep 17 00:00:00 2001 From: Arsile Date: Mon, 8 Mar 2021 16:48:28 +0300 Subject: [PATCH 3/5] WIP --- cqrs/src/lib.rs | 63 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/cqrs/src/lib.rs b/cqrs/src/lib.rs index 990c35d..368c780 100644 --- a/cqrs/src/lib.rs +++ b/cqrs/src/lib.rs @@ -50,6 +50,7 @@ mod event_processing; pub mod lifecycle; use async_trait::async_trait; +use futures::future; #[doc(inline)] pub use cqrs_codegen::*; @@ -64,7 +65,6 @@ pub use self::{ }, lifecycle::BorrowableAsContext, }; -use std::iter::FromIterator; #[async_trait(?Send)] pub trait CommandGateway { @@ -73,28 +73,59 @@ pub trait CommandGateway { async fn send(&self, cmd: Cmd, meta: Mt) -> Result; + /* + async fn send_many(&self, cmds: CMDs, meta: Mt) -> Result, Self::Err> + where + Mt: Clone + 'async_trait, + Cmd: 'async_trait, + CMDs: IntoIterator + { + let futures: Vec<_> = cmds + .into_iter() + .map(|cmd| self.send(cmd, meta.clone())) + .collect(); + + Ok(future::try_join_all(futures).await?) + } + */ +} + +#[async_trait(?Send)] +pub trait BatchCommandGateway, Mt> { + type Err; + type Ok; + + async fn send_many(&self, cmds: CMDs, meta: Mt) -> Result, Self::Err> + where + Cmd: 'async_trait, + CMDs: 'async_trait; +} + +/* +#[async_trait(?Send)] +impl BatchCommandGateway, Mt> for T + where + T: CommandGateway, + Cmd: Command, + Mt: Clone, +{ + type Err = T::Err; + type Ok = T::Ok; + async fn send_many( &self, cmds: Vec, meta: Mt - ) -> Result< - Vec<(Option<::Id>, Self::Ok)>, - Self::Err, - > - where - Mt: Clone + 'async_trait, - Cmd: 'async_trait, - ::Id: Clone - { - let mut res = Vec::new(); + ) -> Result, Self::Err> { + let futures: Vec<_> = cmds + .into_iter() + .map(|cmd| self.send(cmd, meta.clone())) + .collect(); - for cmd in cmds { - let id = cmd.aggregate_id().cloned(); - res.push((id, self.send(cmd, meta.clone()).await?)) - } - Ok(res) + Ok(future::try_join_all(futures).await?) } } +*/ #[async_trait(?Send)] pub trait CommandBus { From 66e72d4e59d0fc4db51e830d8578cf9aef195980 Mon Sep 17 00:00:00 2001 From: Arsile Date: Tue, 9 Mar 2021 09:02:57 +0300 Subject: [PATCH 4/5] Small corrections --- cqrs/src/lib.rs | 42 ------------------------------------------ 1 file changed, 42 deletions(-) diff --git a/cqrs/src/lib.rs b/cqrs/src/lib.rs index 368c780..aa47ab5 100644 --- a/cqrs/src/lib.rs +++ b/cqrs/src/lib.rs @@ -72,22 +72,6 @@ pub trait CommandGateway { type Ok; async fn send(&self, cmd: Cmd, meta: Mt) -> Result; - - /* - async fn send_many(&self, cmds: CMDs, meta: Mt) -> Result, Self::Err> - where - Mt: Clone + 'async_trait, - Cmd: 'async_trait, - CMDs: IntoIterator - { - let futures: Vec<_> = cmds - .into_iter() - .map(|cmd| self.send(cmd, meta.clone())) - .collect(); - - Ok(future::try_join_all(futures).await?) - } - */ } #[async_trait(?Send)] @@ -101,32 +85,6 @@ pub trait BatchCommandGateway, Mt> CMDs: 'async_trait; } -/* -#[async_trait(?Send)] -impl BatchCommandGateway, Mt> for T - where - T: CommandGateway, - Cmd: Command, - Mt: Clone, -{ - type Err = T::Err; - type Ok = T::Ok; - - async fn send_many( - &self, - cmds: Vec, - meta: Mt - ) -> Result, Self::Err> { - let futures: Vec<_> = cmds - .into_iter() - .map(|cmd| self.send(cmd, meta.clone())) - .collect(); - - Ok(future::try_join_all(futures).await?) - } -} -*/ - #[async_trait(?Send)] pub trait CommandBus { type Err; From 5cfadfba718e94929f75ffcd2dbf831b55273de3 Mon Sep 17 00:00:00 2001 From: Arsile Date: Tue, 9 Mar 2021 09:13:54 +0300 Subject: [PATCH 5/5] Add 'BatchCommandGateway' trait to support command batching --- cqrs/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/cqrs/src/lib.rs b/cqrs/src/lib.rs index aa47ab5..2509239 100644 --- a/cqrs/src/lib.rs +++ b/cqrs/src/lib.rs @@ -50,7 +50,6 @@ mod event_processing; pub mod lifecycle; use async_trait::async_trait; -use futures::future; #[doc(inline)] pub use cqrs_codegen::*;