From 1b72e08a16404da1bddd2a5681c725fc6b868f85 Mon Sep 17 00:00:00 2001 From: cirias Date: Tue, 30 Jul 2024 17:14:07 +0800 Subject: [PATCH] fix: Block the sending of control messages (#319) Co-authored-by: Sirius Fang --- src/connection.rs | 23 +++++++++++++++-------- src/consumer/engine.rs | 26 ++++++++++++++------------ src/error.rs | 7 +++++++ src/retry_op.rs | 1 + 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 13eaf085..c2a63a63 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -351,7 +351,7 @@ impl ConnectionSender { match ( self.registrations .unbounded_send(Register::Ping { resolver }), - self.tx.try_send(messages::ping())?, + self.tx.send(messages::ping()).await?, ) { (Ok(_), ()) => { let delay_f = self.executor.delay(self.operation_timeout); @@ -526,35 +526,42 @@ impl ConnectionSender { } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn send_flow(&self, consumer_id: u64, message_permits: u32) -> Result<(), ConnectionError> { + pub async fn send_flow( + &self, + consumer_id: u64, + message_permits: u32, + ) -> Result<(), ConnectionError> { self.tx - .try_send(messages::flow(consumer_id, message_permits))?; + .send(messages::flow(consumer_id, message_permits)) + .await?; Ok(()) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn send_ack( + pub async fn send_ack( &self, consumer_id: u64, message_ids: Vec, cumulative: bool, ) -> Result<(), ConnectionError> { self.tx - .try_send(messages::ack(consumer_id, message_ids, cumulative))?; + .send(messages::ack(consumer_id, message_ids, cumulative)) + .await?; Ok(()) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn send_redeliver_unacknowleged_messages( + pub async fn send_redeliver_unacknowleged_messages( &self, consumer_id: u64, message_ids: Vec, ) -> Result<(), ConnectionError> { self.tx - .try_send(messages::redeliver_unacknowleged_messages( + .send(messages::redeliver_unacknowleged_messages( consumer_id, message_ids, - ))?; + )) + .await?; Ok(()) } diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index 0742a9e5..cc1d7b26 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -151,19 +151,18 @@ impl ConsumerEngine { .connection .sender() .send_flow(self.id, self.batch_size - self.remaining_messages) + .await { Ok(()) => {} Err(ConnectionError::Disconnected) => { self.reconnect().await?; self.connection .sender() - .send_flow(self.id, self.batch_size - self.remaining_messages)?; - } - Err(ConnectionError::SlowDown) => { - self.connection - .sender() - .send_flow(self.id, self.batch_size - self.remaining_messages)?; + .send_flow(self.id, self.batch_size - self.remaining_messages) + .await?; } + // we don't need to handle the SlowDown error, since send_flow waits on the + // channel to be not full Err(e) => return Err(e.into()), } self.remaining_messages = self.batch_size; @@ -178,7 +177,7 @@ impl ConsumerEngine { } } Ok(Some(EngineEvent::EngineMessage(msg))) => { - let continue_loop = self.handle_ack_opt(msg); + let continue_loop = self.handle_ack_opt(msg).await; if !continue_loop { return Ok(()); } @@ -225,14 +224,14 @@ impl ConsumerEngine { } } - fn handle_ack_opt(&mut self, ack_opt: Option>) -> bool { + async fn handle_ack_opt(&mut self, ack_opt: Option>) -> bool { match ack_opt { None => { trace!("ack channel was closed"); false } Some(EngineMessage::Ack(message_id, cumulative)) => { - self.ack(message_id, cumulative); + self.ack(message_id, cumulative).await; true } Some(EngineMessage::Nack(message_id)) => { @@ -240,6 +239,7 @@ impl ConsumerEngine { .connection .sender() .send_redeliver_unacknowleged_messages(self.id, vec![message_id.clone()]) + .await { error!( "could not ask for redelivery for message {:?}: {:?}", @@ -265,6 +265,7 @@ impl ConsumerEngine { .connection .sender() .send_redeliver_unacknowleged_messages(self.id, ids) + .await { error!("could not ask for redelivery: {:?}", e); } else { @@ -288,13 +289,14 @@ impl ConsumerEngine { } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - fn ack(&mut self, message_id: MessageIdData, cumulative: bool) { + async fn ack(&mut self, message_id: MessageIdData, cumulative: bool) { // FIXME: this does not handle cumulative acks self.unacked_messages.remove(&message_id); let res = self .connection .sender() - .send_ack(self.id, vec![message_id], cumulative); + .send_ack(self.id, vec![message_id], cumulative) + .await; if res.is_err() { error!("ack error: {:?}", res); } @@ -510,7 +512,7 @@ impl ConsumerEngine { Error::Custom("DLQ send error".to_string()) })?; - self.ack(message_id, false); + self.ack(message_id, false).await; } _ => self.send_to_consumer(message_id, payload).await?, } diff --git a/src/error.rs b/src/error.rs index b9df7e9c..02c76324 100644 --- a/src/error.rs +++ b/src/error.rs @@ -156,6 +156,13 @@ impl From for ConnectionError { } } +impl From> for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(_err: async_channel::SendError) -> Self { + ConnectionError::Disconnected + } +} + impl From> for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: async_channel::TrySendError) -> Self { diff --git a/src/retry_op.rs b/src/retry_op.rs index 74f3f529..76244d8d 100644 --- a/src/retry_op.rs +++ b/src/retry_op.rs @@ -140,6 +140,7 @@ pub async fn retry_subscribe_consumer( connection .sender() .send_flow(consumer_id, batch_size) + .await .map_err(|err| { error!("TopicConsumer::send_flow({topic}) error: {err:?}"); Error::Consumer(ConsumerError::Connection(err))