Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat on OpenOpts #352

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions src/Channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ bool Channel::OpenOpts::TLSParams::operator==(const TLSParams &o) const {

bool Channel::OpenOpts::operator==(const OpenOpts &o) const {
return host == o.host && vhost == o.vhost && port == o.port &&
frame_max == o.frame_max && auth == o.auth &&
tls_params == o.tls_params;
frame_max == o.frame_max && heartbeat == o.heartbeat &&
auth == o.auth && tls_params == o.tls_params;
}

Channel::ptr_t Channel::Open(const OpenOpts &opts) {
Expand All @@ -203,14 +203,14 @@ Channel::ptr_t Channel::Open(const OpenOpts &opts) {
boost::get<OpenOpts::BasicAuth>(opts.auth);
return boost::make_shared<Channel>(
OpenChannel(opts.host, opts.port, auth.username, auth.password,
opts.vhost, opts.frame_max, false));
opts.vhost, opts.frame_max, opts.heartbeat, false));
}
case 1: {
const OpenOpts::ExternalSaslAuth &auth =
boost::get<OpenOpts::ExternalSaslAuth>(opts.auth);
return boost::make_shared<Channel>(
OpenChannel(opts.host, opts.port, auth.identity, "", opts.vhost,
opts.frame_max, true));
opts.frame_max, opts.heartbeat, true));
}
default:
throw std::logic_error("Unhandled auth type");
Expand All @@ -222,14 +222,14 @@ Channel::ptr_t Channel::Open(const OpenOpts &opts) {
boost::get<OpenOpts::BasicAuth>(opts.auth);
return boost::make_shared<Channel>(OpenSecureChannel(
opts.host, opts.port, auth.username, auth.password, opts.vhost,
opts.frame_max, opts.tls_params.get(), false));
opts.frame_max, opts.heartbeat, opts.tls_params.get(), false));
}
case 1: {
const OpenOpts::ExternalSaslAuth &auth =
boost::get<OpenOpts::ExternalSaslAuth>(opts.auth);
return boost::make_shared<Channel>(
OpenSecureChannel(opts.host, opts.port, auth.identity, "", opts.vhost,
opts.frame_max, opts.tls_params.get(), true));
return boost::make_shared<Channel>(OpenSecureChannel(
opts.host, opts.port, auth.identity, "", opts.vhost, opts.frame_max,
opts.heartbeat, opts.tls_params.get(), true));
}
default:
throw std::logic_error("Unhandled auth type");
Expand Down Expand Up @@ -374,7 +374,8 @@ Channel::ChannelImpl *Channel::OpenChannel(const std::string &host, int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max, bool sasl_external) {
int frame_max, int heartbeat,
bool sasl_external) {
ChannelImpl *impl = new ChannelImpl;
impl->m_connection = amqp_new_connection();

Expand All @@ -387,7 +388,8 @@ Channel::ChannelImpl *Channel::OpenChannel(const std::string &host, int port,
int sock = amqp_socket_open(socket, host.c_str(), port);
impl->CheckForError(sock);

impl->DoLogin(username, password, vhost, frame_max, sasl_external);
impl->DoLogin(username, password, vhost, frame_max, heartbeat,
sasl_external);
} catch (...) {
amqp_destroy_connection(impl->m_connection);
delete impl;
Expand All @@ -402,7 +404,7 @@ Channel::ChannelImpl *Channel::OpenChannel(const std::string &host, int port,
Channel::ChannelImpl *Channel::OpenSecureChannel(
const std::string &host, int port, const std::string &username,
const std::string &password, const std::string &vhost, int frame_max,
const OpenOpts::TLSParams &tls_params, bool sasl_external) {
int heartbeat, const OpenOpts::TLSParams &tls_params, bool sasl_external) {
Channel::ChannelImpl *impl = new ChannelImpl;
impl->m_connection = amqp_new_connection();
if (NULL == impl->m_connection) {
Expand Down Expand Up @@ -447,7 +449,8 @@ Channel::ChannelImpl *Channel::OpenSecureChannel(
status, "Error setting client certificate for socket");
}

impl->DoLogin(username, password, vhost, frame_max, sasl_external);
impl->DoLogin(username, password, vhost, frame_max, heartbeat,
sasl_external);
} catch (...) {
amqp_destroy_connection(impl->m_connection);
delete impl;
Expand Down Expand Up @@ -490,7 +493,7 @@ bool Channel::CheckExchangeExists(boost::string_ref exchange_name) {
amqp_frame_t frame =
m_impl->DoRpc(AMQP_EXCHANGE_DECLARE_METHOD, &declare, DECLARE_OK);
m_impl->MaybeReleaseBuffersOnChannel(frame.channel);
} catch (const NotFoundException& e) {
} catch (const NotFoundException &e) {
return false;
}
return true;
Expand Down Expand Up @@ -614,7 +617,7 @@ bool Channel::CheckQueueExists(boost::string_ref queue_name) {
amqp_frame_t frame =
m_impl->DoRpc(AMQP_QUEUE_DECLARE_METHOD, &declare, DECLARE_OK);
m_impl->MaybeReleaseBuffersOnChannel(frame.channel);
} catch (const NotFoundException& e) {
} catch (const NotFoundException &e) {
return false;
}
return true;
Expand Down
19 changes: 8 additions & 11 deletions src/ChannelImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>

#define BROKER_HEARTBEAT 0

namespace AmqpClient {

namespace {
Expand Down Expand Up @@ -122,7 +120,7 @@ Channel::ChannelImpl::~ChannelImpl() {}
void Channel::ChannelImpl::DoLogin(const std::string &username,
const std::string &password,
const std::string &vhost, int frame_max,
bool sasl_external) {
int heartbeat, bool sasl_external) {
amqp_table_entry_t capabilties[1];
amqp_table_entry_t capability_entry;
amqp_table_t client_properties;
Expand All @@ -142,15 +140,14 @@ void Channel::ChannelImpl::DoLogin(const std::string &username,

if (sasl_external) {
CheckRpcReply(0, amqp_login_with_properties(
m_connection, vhost.c_str(), 0, frame_max,
BROKER_HEARTBEAT, &client_properties,
AMQP_SASL_METHOD_EXTERNAL, username.c_str()));
m_connection, vhost.c_str(), 0, frame_max, heartbeat,
&client_properties, AMQP_SASL_METHOD_EXTERNAL,
username.c_str()));
} else {
CheckRpcReply(
0, amqp_login_with_properties(m_connection, vhost.c_str(), 0, frame_max,
BROKER_HEARTBEAT, &client_properties,
AMQP_SASL_METHOD_PLAIN, username.c_str(),
password.c_str()));
CheckRpcReply(0, amqp_login_with_properties(
m_connection, vhost.c_str(), 0, frame_max, heartbeat,
&client_properties, AMQP_SASL_METHOD_PLAIN,
username.c_str(), password.c_str()));
}

m_brokerVersion = ComputeBrokerVersion(m_connection);
Expand Down
15 changes: 7 additions & 8 deletions src/SimpleAmqpClient/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
std::string vhost; ///< Virtualhost on the broker. Default '/', required.
int port; ///< Port to connect to, default is 5672.
int frame_max; ///< Max frame size in bytes. Default 128KB.
int heartbeat; ///< Heartbeat timer in seconds. Default 0s.
/// One of BasicAuth or ExternalSaslAuth is required.
boost::variant<BasicAuth, ExternalSaslAuth> auth;
/// Connect using TLS/SSL when set, otherwise use an unencrypted channel.
Expand All @@ -128,7 +129,7 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
*/
static OpenOpts FromUri(const std::string &uri);

OpenOpts() : vhost("/"), port(5672), frame_max(131072) {}
OpenOpts() : vhost("/"), port(5672), frame_max(131072), heartbeat(0) {}
bool operator==(const OpenOpts &) const;
};

Expand Down Expand Up @@ -924,14 +925,12 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
const std::string &username,
const std::string &password,
const std::string &vhost, int frame_max,
bool sasl_external);
int heartbeat, bool sasl_external);

static ChannelImpl *OpenSecureChannel(const std::string &host, int port,
const std::string &username,
const std::string &password,
const std::string &vhost, int frame_max,
const OpenOpts::TLSParams &tls_params,
bool sasl_external);
static ChannelImpl *OpenSecureChannel(
const std::string &host, int port, const std::string &username,
const std::string &password, const std::string &vhost, int frame_max,
int heartbeat, const OpenOpts::TLSParams &tls_params, bool sasl_external);

/// PIMPL idiom
boost::scoped_ptr<ChannelImpl> m_impl;
Expand Down
2 changes: 1 addition & 1 deletion src/SimpleAmqpClient/ChannelImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Channel::ChannelImpl : boost::noncopyable {
typedef channel_map_t::iterator channel_map_iterator_t;

void DoLogin(const std::string &username, const std::string &password,
const std::string &vhost, int frame_max,
const std::string &vhost, int frame_max, int heartbeat,
bool sasl_external = false);
amqp_channel_t GetChannel();
void ReturnChannel(amqp_channel_t channel);
Expand Down