Skip to content

Commit

Permalink
Merge pull request #121 from conglisc/master
Browse files Browse the repository at this point in the history
Enhanced consistent region support for JMSSink operator
  • Loading branch information
conglisc committed Aug 23, 2015
2 parents 39bd062 + 5a5aa39 commit 5cdfa5c
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@ The type of message is specified as the value of the message_class attribute in

# Behavior in a consistent region

The `JMSSink` operator can be an operator within the reachability graph of a consistent region.
The `JMSSink` operator can be an operator within the reachability graph of a operator-driven consistent region.
It cannot be the start of a consistent region.

When this operator is participating in a consistent region, the parameter **consistentRegionQueueName** must be specified.

`JMSSink` operator is using transacted session to achieve the purpose of consistent region and parameters **reconnectionPolicy**, **reconnectionBound**, **period**, **maxMessageSendRetries** and **messageSendRetryDelay** must not be specified.
This is due to the fact that if connection problem occurs, the messages sent since the last successful checkpoint would be lost and it is not necessary to continue to process message even after connection is established again.
In this case the operator needs to reset to the last checkpoint and re-send the messages again, thus re-connection policy would not apply when it is participating in a consistent region


# Exceptions

The following list describes the common types of exceptions that can occur:
Expand Down Expand Up @@ -305,6 +312,15 @@ If the **maxMessageSendRetries** is specified, you must also specify a value for
<type>int64</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>consistentRegionQueueName</name>
<description>
This is a required parameter if this operator is participating in a consistent region. This parameter specifies the queue to be used to store consistent region specific information and the operator will perform a JNDI lookup with the queue name specified at initialization state. The queue name specified must also exist on the same messaging server where this operator is establishing the connection.
</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
</parameters>
<inputPorts>
<inputPortSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
Expand Down Expand Up @@ -72,13 +73,41 @@ class JMSConnectionHelper {
// Time to wait before try to resend failed message
private final long messageRetryDelay;

// Indicate message ack mode is client or not
private final boolean useClientAckMode;

// JMS message selector
private String messageSelector;

// Timestamp of session creation
private long sessionCreationTime;

// Consistent region destination object
private Destination destCR = null;

// message producer of the CR queue
private MessageProducer producerCR = null;

// message consumer of the CR queue
private MessageConsumer consumerCR = null;

private synchronized MessageConsumer getConsumerCR() {
return consumerCR;
}

private synchronized void setConsumerCR(MessageConsumer consumerCR) {
this.consumerCR = consumerCR;
}

// getter for CR queue producer
private synchronized MessageProducer getProducerCR() {
return producerCR;
}

// setter for CR queue producer
private synchronized void setProducerCR(MessageProducer producer) {
this.producerCR = producer;
}

public long getSessionCreationTime() {
return sessionCreationTime;
Expand Down Expand Up @@ -165,9 +194,9 @@ private Connection getConnect() {
JMSConnectionHelper(ReconnectionPolicies reconnectionPolicy,
int reconnectionBound, double period, boolean isProducer,
int maxMessageRetry, long messageRetryDelay, String deliveryMode,
Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger, boolean useClientAckMode) {
Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger, boolean useClientAckMode, String msgSelectorCR) {
this(reconnectionPolicy, reconnectionBound, period, isProducer,
maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger, useClientAckMode, null);
maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger, useClientAckMode, msgSelectorCR);
this.nFailedInserts = nFailedInserts;

}
Expand All @@ -178,13 +207,19 @@ public void createInitialConnection() throws ConnectionException,
createConnection();
return;
}


// Method to create initial connection without retry
public void createInitialConnectionNoRetry() throws ConnectionException {
createConnectionNoRetry();
}

// this subroutine creates the initial jndi context by taking the mandatory
// and optional parameters

public void createAdministeredObjects(String initialContextFactory,
String providerURL, String userPrincipal, String userCredential,
String connectionFactory, String destination)
String connectionFactory, String destination, String destinationCR)
throws NamingException {

this.userPrincipal = userPrincipal;
Expand Down Expand Up @@ -215,6 +250,11 @@ public void createAdministeredObjects(String initialContextFactory,

connFactory = (ConnectionFactory) jndiContext.lookup(connectionFactory);
dest = (Destination) jndiContext.lookup(destination);

// Look up CR queue only for producer and when producer is in a CR
if(this.isProducer && this.useClientAckMode) {
destCR = (Destination) jndiContext.lookup(destinationCR);
}

return;
}
Expand Down Expand Up @@ -281,6 +321,19 @@ else if (reconnectionPolicy == ReconnectionPolicies.BoundedRetry

}
}

private synchronized void createConnectionNoRetry() throws ConnectionException {

if (!isConnectValid()) {
try {
connect(isProducer);
} catch (JMSException e) {
logger.log(LogLevel.ERROR, "Connection to JMS failed", new Object[] { e.toString() });
throw new ConnectionException(
"Connection to JMS failed. Did not try to reconnect as the policy is reconnection policy does not apply here.");
}
}
}

// this subroutine creates the connection, producer and consumer, throws a
// JMSException if it fails
Expand All @@ -307,7 +360,19 @@ private boolean connect(boolean isProducer) throws JMSException {
if (isProducer == true) {
// Its JMSSink, So we will create a producer
setProducer(getSession().createProducer(dest));


if(useClientAckMode) {
// Create producer/consumer of the CR queue
setConsumerCR(getSession().createConsumer(destCR, messageSelector));
setProducerCR(getSession().createProducer(destCR));

// Set time to live to 1 week for CR messages and delivery mode to persistent
getProducerCR().setTimeToLive(TimeUnit.MILLISECONDS.convert(7L, TimeUnit.DAYS));
getProducerCR().setDeliveryMode(DeliveryMode.PERSISTENT);
// start the connection
getConnect().start();
}

// set the delivery mode if it is specified
// default is non-persistent
if (deliveryMode == null) {
Expand Down Expand Up @@ -430,6 +495,57 @@ Message receiveMessage(boolean wait, long timeout) throws ConnectionException, I
}
}

// Send message without retry in case of failure
// i.e connection problems, this method raise the error back to caller.
// No connection or message retry will be attempted.
boolean sendMessageNoRetry(Message message) throws JMSException {

boolean res = false;

try {

// try to send the message
synchronized (getSession()) {
getProducer().send(message);
res = true;

}
}
catch (JMSException e) {
// error has occurred, log error and try sending message again
logger.log(LogLevel.WARN, "ERROR_DURING_SEND", new Object[] { e.toString() });

// If the exception is caused by message format, then we can return peacefully as connection is still good.
if(!(e instanceof MessageFormatException)) {
throw e;
}

}

if(!res) {
nFailedInserts.incrementValue(1);
}

return res;
}

// send a consistent region message to the consistent region queue
void sendCRMessage(Message message) throws JMSException {

synchronized (getSession()) {
getProducerCR().send(message);
}

}

// receive a message from consistent region queue
Message receiveCRMessage() throws JMSException {

synchronized (getSession()) {
return (getConsumerCR().receiveNoWait());
}
}

// Recovers session causing unacknowledged message to be re-delivered
public void recoverSession() throws JMSException, ConnectionException, InterruptedException {

Expand All @@ -449,6 +565,20 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt

}
}

public void commitSession() throws JMSException {

synchronized (getSession()) {
getSession().commit();
}
}

public void roolbackSession() throws JMSException {

synchronized (getSession()) {
getSession().rollback();
}
}

// close the connection
public void closeConnection() throws JMSException {
Expand Down
Loading

0 comments on commit 5cdfa5c

Please sign in to comment.