From e434b5ff28c8adac47a25d2f66dbd895cf45e88d Mon Sep 17 00:00:00 2001 From: legoc Date: Fri, 3 Nov 2023 16:19:10 +0100 Subject: [PATCH] Added timeout parameter to Channel::BasicPublish() --- src/Channel.cpp | 14 +++++++++++--- src/SimpleAmqpClient/Channel.h | 8 ++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/Channel.cpp b/src/Channel.cpp index 8950786..0c5cb9b 100644 --- a/src/Channel.cpp +++ b/src/Channel.cpp @@ -819,10 +819,10 @@ void Channel::BasicReject(const Envelope::DeliveryInfo &info, bool requeue, AMQP_BASIC_NACK_METHOD, &req)); } -void Channel::BasicPublish(const std::string &exchange_name, +bool Channel::BasicPublish(const std::string &exchange_name, const std::string &routing_key, const BasicMessage::ptr_t message, bool mandatory, - bool immediate) { + bool immediate, int timeout) { m_impl->CheckIsConnected(); amqp_channel_t channel = m_impl->GetChannel(); @@ -834,6 +834,10 @@ void Channel::BasicPublish(const std::string &exchange_name, StringToBytes(routing_key), mandatory, immediate, &properties, StringToBytes(message->Body()))); + boost::chrono::microseconds real_timeout = + (timeout >= 0 ? boost::chrono::milliseconds(timeout) + : boost::chrono::microseconds::max()); + // If we've done things correctly we can get one of 4 things back from the // broker // - basic.ack - our channel is in confirm mode, messsage was 'dealt with' by @@ -850,7 +854,9 @@ void Channel::BasicPublish(const std::string &exchange_name, AMQP_BASIC_NACK_METHOD}}; amqp_frame_t response; boost::array channels = {{channel}}; - m_impl->GetMethodOnChannel(channels, response, PUBLISH_ACK); + if (!m_impl->GetMethodOnChannel(channels, response, PUBLISH_ACK, real_timeout)) { + return false; + } if (AMQP_BASIC_NACK_METHOD == response.payload.method.id) { amqp_basic_nack_t *return_method = @@ -878,6 +884,8 @@ void Channel::BasicPublish(const std::string &exchange_name, m_impl->ReturnChannel(channel); m_impl->MaybeReleaseBuffersOnChannel(channel); + + return true; } bool Channel::BasicGet(Envelope::ptr_t &envelope, const std::string &queue, diff --git a/src/SimpleAmqpClient/Channel.h b/src/SimpleAmqpClient/Channel.h index ee65bef..bda85a4 100644 --- a/src/SimpleAmqpClient/Channel.h +++ b/src/SimpleAmqpClient/Channel.h @@ -714,13 +714,17 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable { * @param immediate Requires the message to be both routed to a queue, and * immediately delivered to a consumer. If the message is not routed, or a * consumer cannot immediately deliver the message, + * @param timeout The timeout in milliseconds for the message to be + * delivered. 0 works like a non-blocking call, -1 is an infinite timeout. + * * a \ref MessageReturnedException is thrown. This has no effect when using * RabbitMQ v3.0 and newer. + * @returns `true` on message delivery, `false` on timeout. */ - void BasicPublish(const std::string &exchange_name, + bool BasicPublish(const std::string &exchange_name, const std::string &routing_key, const BasicMessage::ptr_t message, bool mandatory = false, - bool immediate = false); + bool immediate = false, int timeout = -1); /** * Synchronously consume a message from a queue