From 9d1f8dc946a5e0ec85dc1e0d77e157d9b4ad1544 Mon Sep 17 00:00:00 2001 From: Tiburso Date: Fri, 17 Jan 2025 16:21:48 +0100 Subject: [PATCH] fix: update BrokerProducer to accept references for message publishing --- server/libs/common/src/brokers/core.rs | 2 +- server/libs/common/src/brokers/rabbit.rs | 2 +- server/services/manager/src/api/tasks.rs | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/libs/common/src/brokers/core.rs b/server/libs/common/src/brokers/core.rs index 87bb1df..ef32cfb 100644 --- a/server/libs/common/src/brokers/core.rs +++ b/server/libs/common/src/brokers/core.rs @@ -18,5 +18,5 @@ pub trait BrokerConsumer { #[automock] #[async_trait] pub trait BrokerProducer: Send + Sync + Debug { - async fn publish_message(&self, message: T) -> Result<(), Box>; + async fn publish_message(&self, message: &T) -> Result<(), Box>; } diff --git a/server/libs/common/src/brokers/rabbit.rs b/server/libs/common/src/brokers/rabbit.rs index 529e03c..8c68dee 100644 --- a/server/libs/common/src/brokers/rabbit.rs +++ b/server/libs/common/src/brokers/rabbit.rs @@ -178,7 +178,7 @@ impl TaskInstanceRabbitMQProducer { #[async_trait] impl BrokerProducer for TaskInstanceRabbitMQProducer { - async fn publish_message(&self, task: TaskInstance) -> Result<(), Box> { + async fn publish_message(&self, task: &TaskInstance) -> Result<(), Box> { let payload = serde_json::to_vec(&task.input_data)?; // Need to change the routing key to the task kind, future will be worker kind diff --git a/server/services/manager/src/api/tasks.rs b/server/services/manager/src/api/tasks.rs index 7b5a273..8818bd8 100644 --- a/server/services/manager/src/api/tasks.rs +++ b/server/services/manager/src/api/tasks.rs @@ -9,7 +9,7 @@ use tracing::{error, info}; use utoipa::ToSchema; use uuid::Uuid; -use common::{models::TaskInstance, TaskStatus}; +use common::{brokers::core::BrokerProducer, models::TaskInstance, TaskStatus}; use crate::{ repo::{TaskInstanceRepository, TaskKindRepository}, @@ -97,7 +97,7 @@ struct CreateTaskInput { tag = "tasks" )] async fn create_task( - State(mut state): State, + State(state): State, Json(task_input): Json, ) -> Result<(StatusCode, Json), (StatusCode, String)> { info!("Creating task with kind: {:?}", task_input.task_kind_name); @@ -129,7 +129,7 @@ async fn create_task( })?; // Send the task to the worker queue - let worker_id = state.broker.publish(&task).await.map_err(|e| { + state.broker.publish_message(&task).await.map_err(|e| { error!("Failed to publish task to broker: {:?}", e); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -140,7 +140,7 @@ async fn create_task( // Assign the task the worker state .task_repository - .assign_task_to_worker(&task.id, &worker_id) + .assign_task_to_worker(&task.id, &Uuid::nil()) // currently this is empty as we don't have a worker returning from the broker anymore .await .map_err(|e| { error!("Failed to assign task to worker: {:?}", e);