Skip to content

Commit

Permalink
[fixed] #1: Automatically reconnect after connection has failed
Browse files Browse the repository at this point in the history
Updated the source code to identify closed sessions or transaction. But mostly important, I added to the documentation how to use the failover:// protocol handler. With failover:// ActiveMQ itself handles a broken connection.
  • Loading branch information
ckl committed Sep 16, 2015
1 parent baf48d6 commit f720d75
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,22 @@ Das Plugin wird in der monitord.xml folgendermaßen konfiguriert:

Bitte beachten: Der XML-Parser von monitord unterstützt *keine* leeren Tags (also <tag/> oder <tag></tag>). Sollte die Konfiguration dennoch einen solchen enthalten, gibt es einen Segmentation Fault.

##### Wiederherstellung der ActiveMQ-Verbindung
Je nach Einsatz kann es sein, dass die Verbindung zwischen monitord und dem ActiveMQ-Broker abbricht. Dies kann zum Beispiel auftreten, wenn es Probleme mit der TCP-Verbindung gibt oder aber der Broker zwischenzeitlich neugestartet worden ist.
Um darauf zu reagieren, kann der ActiveMQ-Client ein Failover nutzen:

<monitordconfig version="1.0">
<!-- ... -->
<brokerUri>failover://(tcp://127.0.0.1:61616)</brokerUri>
<!-- ... -->
</monitordconfig>

bzw. wenn mehrere Broker genutzt werden sollen:

<monitordconfig version="1.0">
<!-- ... -->
<brokerUri>failover://(tcp://192.168.0.1:61616,192.168.0.2:61616)</brokerUri>
<!-- ... -->
</monitordconfig>


28 changes: 25 additions & 3 deletions monitord/plugins/libmplugin_activemq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,18 @@ bool MonitorPlugInActiveMQ::initializeConnection()

// create a connection
try {
m_connection = m_connectionFactory->createConnection();
m_connection = dynamic_cast<activemq::core::ActiveMQConnection*>(m_connectionFactory->createConnection());
LOG_INFO("Connection prepared")
m_connection->start();
LOG_INFO("Connection started")

if (m_bClientAck) {
m_session = m_connection->createSession(Session::CLIENT_ACKNOWLEDGE);
LOG_DEBUG("Setting: client acknowledge required")
m_session = dynamic_cast<activemq::core::ActiveMQSession*>(m_connection->createSession(Session::CLIENT_ACKNOWLEDGE));
}
else {
m_session = m_connection->createSession(Session::AUTO_ACKNOWLEDGE);
LOG_DEBUG("Setting: auto acknowledge enabled")
m_session = dynamic_cast<activemq::core::ActiveMQSession*>(m_connection->createSession(Session::AUTO_ACKNOWLEDGE));
}

m_bConnected = true;
Expand All @@ -224,10 +226,12 @@ void MonitorPlugInActiveMQ::freeConnection() {
// close open resources
try {
if (m_session != NULL) {
LOG_DEBUG("Closing session..")
m_session->close();
}

if (m_connection != NULL) {
LOG_DEBUG("Closing connection...")
m_connection->close();
}
}
Expand Down Expand Up @@ -280,6 +284,24 @@ bool MonitorPlugInActiveMQ::initProcessing(class MonitorConfiguration* configPtr
}

bool MonitorPlugInActiveMQ::establishConnection() {
// if some connection issue has occured, try to reconnect to the broker
try {
if (m_connection != NULL) {
LOG_DEBUG("Checking for closed connection")
m_connection->checkClosed();
}

if (m_session != NULL && (false == m_session->isStarted())) {
throw CMSException("Connection open but session not started");
}

}
catch (CMSException& ex) {
LOG_ERROR("Connection is broke:" << ex.getMessage())
LOG_INFO("Freeing resources for reconnecting...")
freeConnection();
}

if (m_bConnected == false) {
LOG_DEBUG("m_bConnected is false: \"" << m_bConnected << "\"")

Expand Down
6 changes: 4 additions & 2 deletions monitord/plugins/libmplugin_activemq.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "base64.h"
#include "../MonitorLogging.h"
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
Expand Down Expand Up @@ -78,8 +80,8 @@ class MonitorPlugInActiveMQ : public MonitorPlugIn, public cms::ExceptionListene
bool m_bTopicsInitialized;

std::auto_ptr<activemq::core::ActiveMQConnectionFactory> m_connectionFactory;
cms::Connection* m_connection;
cms::Session* m_session;
activemq::core::ActiveMQConnection* m_connection;
activemq::core::ActiveMQSession* m_session;

TopicInfo m_genericTopic;
Topics m_topics;
Expand Down

0 comments on commit f720d75

Please sign in to comment.