Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout parameter to Channel::BasicPublish() #333

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/Channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,10 +819,10 @@ void Channel::BasicReject(const Envelope::DeliveryInfo &info, bool requeue,
AMQP_BASIC_NACK_METHOD, &req));
}

void Channel::BasicPublish(const std::string &exchange_name,
bool Channel::BasicPublish(const std::string &exchange_name,
const std::string &routing_key,
const BasicMessage::ptr_t message, bool mandatory,
bool immediate) {
bool immediate, int timeout) {
m_impl->CheckIsConnected();
amqp_channel_t channel = m_impl->GetChannel();

Expand All @@ -834,6 +834,10 @@ void Channel::BasicPublish(const std::string &exchange_name,
StringToBytes(routing_key), mandatory, immediate, &properties,
StringToBytes(message->Body())));

boost::chrono::microseconds real_timeout =
(timeout >= 0 ? boost::chrono::milliseconds(timeout)
: boost::chrono::microseconds::max());

// If we've done things correctly we can get one of 4 things back from the
// broker
// - basic.ack - our channel is in confirm mode, messsage was 'dealt with' by
Expand All @@ -850,7 +854,9 @@ void Channel::BasicPublish(const std::string &exchange_name,
AMQP_BASIC_NACK_METHOD}};
amqp_frame_t response;
boost::array<amqp_channel_t, 1> channels = {{channel}};
m_impl->GetMethodOnChannel(channels, response, PUBLISH_ACK);
if (!m_impl->GetMethodOnChannel(channels, response, PUBLISH_ACK, real_timeout)) {
return false;
}

if (AMQP_BASIC_NACK_METHOD == response.payload.method.id) {
amqp_basic_nack_t *return_method =
Expand Down Expand Up @@ -878,6 +884,8 @@ void Channel::BasicPublish(const std::string &exchange_name,

m_impl->ReturnChannel(channel);
m_impl->MaybeReleaseBuffersOnChannel(channel);

return true;
}

bool Channel::BasicGet(Envelope::ptr_t &envelope, const std::string &queue,
Expand Down
8 changes: 6 additions & 2 deletions src/SimpleAmqpClient/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -714,13 +714,17 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
* @param immediate Requires the message to be both routed to a queue, and
* immediately delivered to a consumer. If the message is not routed, or a
* consumer cannot immediately deliver the message,
* @param timeout The timeout in milliseconds for the message to be
* delivered. 0 works like a non-blocking call, -1 is an infinite timeout.
*
* a \ref MessageReturnedException is thrown. This has no effect when using
* RabbitMQ v3.0 and newer.
* @returns `true` on message delivery, `false` on timeout.
*/
void BasicPublish(const std::string &exchange_name,
bool BasicPublish(const std::string &exchange_name,
const std::string &routing_key,
const BasicMessage::ptr_t message, bool mandatory = false,
bool immediate = false);
bool immediate = false, int timeout = -1);

/**
* Synchronously consume a message from a queue
Expand Down