Skip to content

Commit

Permalink
Merge pull request #74 from fraktalio/feature/domain_error_upgrade_fm…
Browse files Browse the repository at this point in the history
…odel

upgrading to the latest fmodel: domain error, orchestrating ES aggregate
  • Loading branch information
idugalic authored Jan 29, 2025
2 parents d34b087 + f63021d commit eb33e1d
Show file tree
Hide file tree
Showing 20 changed files with 427 additions and 685 deletions.
164 changes: 88 additions & 76 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ authors = ["Ivan Dugalic <[email protected]>"]


[dependencies]
fmodel-rust = "0.7.0"
# fmodel-rust = "0.7.0"
fmodel-rust = "0.8.0"
actix-cors = "0.7.0"
actix-web = "4.9.0"
actix-rt = "2.10.0"
Expand Down
48 changes: 46 additions & 2 deletions src/adapter/database/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ pub async fn list_events(
pub async fn get_latest_event(
decider_id: &String,
app: &Database,
) -> Result<EventEntity, ErrorMessage> {
) -> Result<Option<EventEntity>, ErrorMessage> {
sqlx::query_as!(
EventEntity,
"SELECT * FROM events WHERE decider_id = $1 ORDER BY events.offset DESC LIMIT 1",
decider_id
)
.fetch_one(&app.db)
.fetch_optional(&app.db)
.await
.map_err(|e| ErrorMessage {
message: e.to_string(),
Expand Down Expand Up @@ -96,6 +96,50 @@ pub async fn append_event(
})
}

#[allow(dead_code)]
pub async fn append_events(
events: &[NewEventEntity],
app: &Database, // Ensure `Database` contains `sqlx::Pool<sqlx::Postgres>`
) -> Result<Vec<EventEntity>, ErrorMessage> {
// Start a new transaction
let mut tx = app.db.begin().await.map_err(|e| ErrorMessage {
message: e.to_string(),
})?;

let mut appended_events = Vec::new();

for event in events {
let appended_event = sqlx::query_as!(
EventEntity,
"INSERT INTO events (event, event_id, decider, decider_id, data, command_id, previous_id, final)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *",
event.event,
event.event_id,
event.decider,
event.decider_id,
event.data,
event.command_id,
event.previous_id,
event.r#final
)
.fetch_one(&mut *tx)
.await
.map_err(|e| ErrorMessage {
message: e.to_string(),
})?;

appended_events.push(appended_event);
}

// Commit the transaction
tx.commit().await.map_err(|e| ErrorMessage {
message: e.to_string(),
})?;

Ok(appended_events)
}

// ############################### QUERY SIDE ###############################

/// DB: Register a new view
Expand Down
37 changes: 16 additions & 21 deletions src/adapter/event_stream/saga_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{ack_event, nack_event, stream_events};
use crate::adapter::publisher::order_action_publisher::OrderActionPublisher;
use crate::adapter::repository::restaurant_event_repository::ToRestaurantEvent;
use crate::adapter::repository::event_repository::ToEvent;
use crate::application::api::OrderSagaManager;
use crate::Database;
use log::{debug, error, warn};
Expand All @@ -19,30 +19,25 @@ pub async fn stream_events_to_saga(
Ok(Some(event_entity)) => {
debug!("Processing Event in Saga: {:?}", event_entity);
match event_entity.decider.as_str() {
"Restaurant" => {
match order_saga_manager
.handle(&event_entity.to_restaurant_event()?)
"Restaurant" => match order_saga_manager.handle(&event_entity.to_event()?).await {
Ok(_) => {
debug!("Order Saga executed successfully");
ack_event(
&event_entity.offset,
&"saga".to_string(),
&event_entity.decider_id,
db,
)
.await
{
Ok(_) => {
debug!("Order Saga executed successfully");
ack_event(
&event_entity.offset,
&"saga".to_string(),
&event_entity.decider_id,
db,
)
.map(drop)
}
Err(error) => {
error!("Order Saga failed: {}", error.message);
nack_event(&"saga".to_string(), &event_entity.decider_id, db)
.await
.map(drop)
}
Err(error) => {
error!("Order Saga failed: {}", error.message);
nack_event(&"saga".to_string(), &event_entity.decider_id, db)
.await
.map(drop)
}
}
}
},
_ => {
warn!("Unknown event type: {}", event_entity.event);
ack_event(
Expand Down
7 changes: 3 additions & 4 deletions src/adapter/event_stream/view_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::sync::Arc;

use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{ack_event, nack_event, stream_events};
use crate::adapter::repository::order_event_repository::ToOrderEvent;
use crate::adapter::repository::event_repository::ToEvent;
use crate::adapter::repository::order_view_state_repository::OrderViewStateRepository;
use crate::adapter::repository::restaurant_event_repository::ToRestaurantEvent;
use crate::adapter::repository::restaurant_view_state_repository::RestaurantViewStateRepository;
use crate::application::api::{OrderMaterializedView, RestaurantMaterializedView};
use crate::Database;
Expand All @@ -25,7 +24,7 @@ pub async fn stream_events_to_view(
match event_entity.decider.as_str() {
"Restaurant" => {
match restaurant_materialized_view
.handle(&event_entity.to_restaurant_event()?)
.handle(&event_entity.to_event()?)
.await
{
Ok(_) => {
Expand All @@ -52,7 +51,7 @@ pub async fn stream_events_to_view(
}
"Order" => {
match order_materialized_view
.handle(&event_entity.to_order_event()?)
.handle(&event_entity.to_event()?)
.await
{
Ok(_) => {
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/publisher/order_action_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::repository::order_event_repository::OrderEventRepository;
use crate::adapter::repository::event_repository::AggregateEventRepository;
use crate::application::api::OrderAggregate;
use crate::domain::api::OrderCommand;
use fmodel_rust::saga_manager::ActionPublisher;
use std::sync::Arc;

/// Order action publisher - used by the Saga Manager to publish actions/commands
pub struct OrderActionPublisher<'a> {
pub order_aggregate: Arc<OrderAggregate<'a, OrderEventRepository>>,
pub order_aggregate: Arc<OrderAggregate<'a, AggregateEventRepository>>,
}

/// Fmodel action publisher implementation fot the OrderActionPublisher
Expand Down
Loading

0 comments on commit eb33e1d

Please sign in to comment.