Skip to content

Commit

Permalink
fix: update BrokerProducer to accept references for message publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiburso committed Jan 17, 2025
1 parent ba26912 commit 9d1f8dc
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion server/libs/common/src/brokers/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ pub trait BrokerConsumer<T: Send + Sync + 'static> {
#[automock]
#[async_trait]
pub trait BrokerProducer<T: Send + Sync>: Send + Sync + Debug {
async fn publish_message(&self, message: T) -> Result<(), Box<dyn std::error::Error>>;
async fn publish_message(&self, message: &T) -> Result<(), Box<dyn std::error::Error>>;
}
2 changes: 1 addition & 1 deletion server/libs/common/src/brokers/rabbit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl TaskInstanceRabbitMQProducer {

#[async_trait]
impl BrokerProducer<TaskInstance> for TaskInstanceRabbitMQProducer {
async fn publish_message(&self, task: TaskInstance) -> Result<(), Box<dyn std::error::Error>> {
async fn publish_message(&self, task: &TaskInstance) -> Result<(), Box<dyn std::error::Error>> {
let payload = serde_json::to_vec(&task.input_data)?;

// Need to change the routing key to the task kind, future will be worker kind
Expand Down
8 changes: 4 additions & 4 deletions server/services/manager/src/api/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{error, info};
use utoipa::ToSchema;
use uuid::Uuid;

use common::{models::TaskInstance, TaskStatus};
use common::{brokers::core::BrokerProducer, models::TaskInstance, TaskStatus};

use crate::{
repo::{TaskInstanceRepository, TaskKindRepository},
Expand Down Expand Up @@ -97,7 +97,7 @@ struct CreateTaskInput {
tag = "tasks"
)]
async fn create_task(
State(mut state): State<AppState>,
State(state): State<AppState>,
Json(task_input): Json<CreateTaskInput>,
) -> Result<(StatusCode, Json<TaskInstance>), (StatusCode, String)> {
info!("Creating task with kind: {:?}", task_input.task_kind_name);
Expand Down Expand Up @@ -129,7 +129,7 @@ async fn create_task(
})?;

// Send the task to the worker queue
let worker_id = state.broker.publish(&task).await.map_err(|e| {
state.broker.publish_message(&task).await.map_err(|e| {
error!("Failed to publish task to broker: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -140,7 +140,7 @@ async fn create_task(
// Assign the task the worker
state
.task_repository
.assign_task_to_worker(&task.id, &worker_id)
.assign_task_to_worker(&task.id, &Uuid::nil()) // currently this is empty as we don't have a worker returning from the broker anymore
.await
.map_err(|e| {
error!("Failed to assign task to worker: {:?}", e);
Expand Down

0 comments on commit 9d1f8dc

Please sign in to comment.