diff --git a/com.ibm.streamsx.messaging/.classpath b/com.ibm.streamsx.messaging/.classpath index 0a4b1f3..54ae629 100644 --- a/com.ibm.streamsx.messaging/.classpath +++ b/com.ibm.streamsx.messaging/.classpath @@ -3,12 +3,12 @@ - - + + diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml index f99fe12..25e4284 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml @@ -83,6 +83,16 @@ The following types of exceptions can occur: * The **connectionsDocument** parameter refers to an file that does not exist. * The **connectionsDocument** parameter is not specified and the `connections.xml` file is not present in the default location. * The **messageIDOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type. + * The **jmsDestinationOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type. + * The **jmsDeliveryModeOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a int32 type. + * The **jmsExpirationOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a int64 type. + * The **jmsPriorityOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a int32 type. + * The **jmsMessageIDOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type. + * The **jmsTimestampOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a int64 type. + * The **jmsCorrelationIDOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type. + * The **jmsReplyToOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type. + * The **jmsTypeOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type. + * The **jmsRedeliveredOutAttrName** parameter is specified but the attribute is not found in output schema or the type of attribute is not a boolean type. * Run time errors that cause a message to be dropped and an error message to be logged. * The `JMSSource` operator throws an exception and discards the message in the following cases. The trace and log information for these exceptions is logged in the console logs @@ -319,13 +329,103 @@ This parameter must be greater than zero and must be set if the JMSSource operat 1 - messageIDOutAttrName + messageIDOutAttrName Output attribute on output data stream to assign message ID to, the specified attribute in output stream must be of type rstring. true rstring 1 + + + jmsDestinationOutAttrName + + Output attribute on output data stream to assign JMSDestination to, the specified attribute in output stream must be of type rstring. + + true + rstring + 1 + + + jmsDeliveryModeOutAttrName + + Output attribute on output data stream to assign JMSDeliveryMode to, the specified attribute in output stream must be of type int32. + + true + rstring + 1 + + + jmsExpirationOutAttrName + + Output attribute on output data stream to assign JMSExpiration to, the specified attribute in output stream must be of type int64. + + true + rstring + 1 + + + jmsPriorityOutAttrName + + Output attribute on output data stream to assign JMSPriority to, the specified attribute in output stream must be of type int32. + + true + rstring + 1 + + + jmsMessageIDOutAttrName + + Output attribute on output data stream to assign JMSMessageID to, the specified attribute in output stream must be of type rstring. + + true + rstring + 1 + + + jmsTimestampOutAttrName + + Output attribute on output data stream to assign JMSTimestamp to, the specified attribute in output stream must be of type int64. + + true + rstring + 1 + + + jmsCorrelationIDOutAttrName + + Output attribute on output data stream to assign JMSCorrelationID to, the specified attribute in output stream must be of type rstring. + + true + rstring + 1 + + + jmsReplyToOutAttrName + + Output attribute on output data stream to assign JMSReplyTo to, the specified attribute in output stream must be of type rstring. + + true + rstring + 1 + + + jmsTypeOutAttrName + + Output attribute on output data stream to assign JMSType to, the specified attribute in output stream must be of type rstring. + + true + rstring + 1 + + + jmsRedeliveredOutAttrName + + Output attribute on output data stream to assign JMSRedelivered to, the specified attribute in output stream must be of type boolean. + + true + rstring + 1 appConfigName diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index 208c46c..7ebe86a 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -11,6 +11,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.logging.Logger; @@ -180,6 +181,29 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { private String messageIDOutAttrName = null; + private String jmsDestinationOutAttrName = null; + private String jmsDeliveryModeOutAttrName = null; + private String jmsExpirationOutAttrName = null; + private String jmsPriorityOutAttrName = null; + private String jmsMessageIDOutAttrName = null; + private String jmsTimestampOutAttrName = null; + private String jmsCorrelationIDOutAttrName = null; + private String jmsReplyToOutAttrName = null; + private String jmsTypeOutAttrName = null; + private String jmsRedeliveredOutAttrName = null; + + private static List jmsHeaderValOutAttrNames = Arrays.asList("messageIDOutAttrName", + "jmsDestinationOutAttrName", + "jmsDeliveryModeOutAttrName", + "jmsExpirationOutAttrName", + "jmsPriorityOutAttrName", + "jmsMessageIDOutAttrName", + "jmsTimestampOutAttrName", + "jmsCorrelationIDOutAttrName", + "jmsReplyToOutAttrName", + "jmsTypeOutAttrName", + "jmsRedeliveredOutAttrName"); + private Object resetLock = new Object(); // application configuration name @@ -283,6 +307,96 @@ public void setMessageIDOutAttrName(String messageIDOutAttrName) { this.messageIDOutAttrName = messageIDOutAttrName; } + public String getJmsDestinationOutAttrName() { + return jmsDestinationOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSDestination" ) + public void setJmsDestinationOutAttrName(String jmsDestinationOutAttrName) { + this.jmsDestinationOutAttrName = jmsDestinationOutAttrName; + } + + public String getJmsDeliveryModeOutAttrName() { + return jmsDeliveryModeOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSDeliveryMode" ) + public void setJmsDeliveryModeOutAttrName(String jmsDeliveryModeOutAttrName) { + this.jmsDeliveryModeOutAttrName = jmsDeliveryModeOutAttrName; + } + + public String getJmsExpirationOutAttrName() { + return jmsExpirationOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSExpiration" ) +public void setJmsExpirationOutAttrName(String jmsExpirationOutAttrName) { + this.jmsExpirationOutAttrName = jmsExpirationOutAttrName; + } + + public String getJmsPriorityOutAttrName() { + return jmsPriorityOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSPriority" ) + public void setJmsPriorityOutAttrName(String jmsPriorityOutAttrName) { + this.jmsPriorityOutAttrName = jmsPriorityOutAttrName; + } + + public String getJmsMessageIDOutAttrName() { + return jmsMessageIDOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSMessageID" ) + public void setJmsMessageIDOutAttrName(String jmsMessageIDOutAttrName) { + this.jmsMessageIDOutAttrName = jmsMessageIDOutAttrName; + } + + public String getJmsTimestampOutAttrName() { + return jmsTimestampOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSTimestamp" ) + public void setJmsTimestampOutAttrName(String jmsTimestampOutAttrName) { + this.jmsTimestampOutAttrName = jmsTimestampOutAttrName; + } + + public String getJmsCorrelationIDOutAttrName() { + return jmsCorrelationIDOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSCorrelationID" ) + public void setJmsCorrelationIDOutAttrName(String jmsCorrelationIDOutAttrName) { + this.jmsCorrelationIDOutAttrName = jmsCorrelationIDOutAttrName; + } + + public String getJmsReplyToOutAttrName() { + return jmsReplyToOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSReplyTo" ) + public void setJmsReplyToOutAttrName(String jmsReplyToOutAttrName) { + this.jmsReplyToOutAttrName = jmsReplyToOutAttrName; + } + + public String getJmsTypeOutAttrName() { + return jmsTypeOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSType" ) + public void setJmsTypeOutAttrName(String jmsTypeOutAttrName) { + this.jmsTypeOutAttrName = jmsTypeOutAttrName; + } + + public String getJmsRedeliveredOutAttrName() { + return jmsRedeliveredOutAttrName; + } + + @Parameter(optional = true, description = "The name of the output attribute to store into the JMS Header information JMSRedelivered" ) + public void setJmsRedeliveredOutAttrName(String jmsRedeliveredOutAttrName) { + this.jmsRedeliveredOutAttrName = jmsRedeliveredOutAttrName; + } + public String getMessageSelector() { return messageSelector; } @@ -531,20 +645,37 @@ public static void checkParametersRuntime(OperatorContextChecker checker) { } } - if (checker.getOperatorContext().getParameterNames().contains("messageIDOutAttrName")) { //$NON-NLS-1$ - - List parameterValues = checker.getOperatorContext().getParameterValues("messageIDOutAttrName"); //$NON-NLS-1$ - String outAttributeName = parameterValues.get(0); - List> outputPorts = checker.getOperatorContext().getStreamingOutputs(); - if (outputPorts.size() > 0) - { - StreamingOutput outputPort = outputPorts.get(0); - StreamSchema streamSchema = outputPort.getStreamSchema(); - boolean check = checker.checkRequiredAttributes(streamSchema, outAttributeName); - if (check) - checker.checkAttributeType(streamSchema.getAttribute(outAttributeName), MetaType.RSTRING); + for(String jmsHeaderValOutAttrName : jmsHeaderValOutAttrNames ) { + if (checker.getOperatorContext().getParameterNames().contains(jmsHeaderValOutAttrName)) { + + List parameterValues = checker.getOperatorContext().getParameterValues(jmsHeaderValOutAttrName); + String outAttributeName = parameterValues.get(0); + List> outputPorts = checker.getOperatorContext().getStreamingOutputs(); + if (outputPorts.size() > 0) + { + StreamingOutput outputPort = outputPorts.get(0); + StreamSchema streamSchema = outputPort.getStreamSchema(); + boolean check = checker.checkRequiredAttributes(streamSchema, outAttributeName); + if (check) { + switch (jmsHeaderValOutAttrName) { + case "jmsDeliveryModeOutAttrName": + case "jmsPriorityOutAttrName": + checker.checkAttributeType(streamSchema.getAttribute(outAttributeName), MetaType.INT32); + break; + case "jmsExpirationOutAttrName": + case "jmsTimestampOutAttrName": + checker.checkAttributeType(streamSchema.getAttribute(outAttributeName), MetaType.INT64); + break; + case "jmsRedeliveredOutAttrName": + checker.checkAttributeType(streamSchema.getAttribute(outAttributeName), MetaType.BOOLEAN); + break; + default: + checker.checkAttributeType(streamSchema.getAttribute(outAttributeName), MetaType.RSTRING); + } + } + } } - } + } if((checker.getOperatorContext().getParameterNames().contains("appConfigName"))) { //$NON-NLS-1$ String appConfigName = checker.getOperatorContext().getParameterValues("appConfigName").get(0); //$NON-NLS-1$ @@ -719,18 +850,14 @@ private void registerForDataGovernance(String providerURL, String destination) { providerURL, IGovernanceConstants.ASSET_JMS_SERVER_TYPE, true, "JMSSource"); //$NON-NLS-1$ } + /* (non-Javadoc) + * @see com.ibm.streams.operator.samples.patterns.ProcessTupleProducer#process() + */ @Override protected void process() throws IOException, ConnectionException { boolean isInConsistentRegion = consistentRegionContext != null; boolean isTriggerOperator = isInConsistentRegion && consistentRegionContext.isTriggerOperator(); - int msgIdAttrIndex = -1; - - if(this.getMessageIDOutAttrName() != null) { - StreamSchema streamSchema = getOutput(0).getStreamSchema(); - msgIdAttrIndex = streamSchema.getAttributeIndex(this.getMessageIDOutAttrName()); - } - // create the initial connection. try @@ -880,9 +1007,7 @@ protected void process() throws IOException, ConnectionException break; // the message was read successfully case SUCCESSFUL_MESSAGE: - if(msgIdAttrIndex != -1 && msg.getJMSMessageID() != null) { - dataTuple.setObject(msgIdAttrIndex, new RString(msg.getJMSMessageID())); - } + handleJmsHeaderValues(msg, dataTuple); dataOutputPort.submit(dataTuple); break; default: @@ -946,6 +1071,113 @@ protected void process() throws IOException, ConnectionException } } + + /** + * Handles the values of the JMSHeader of the current message. It first checks + * if there is an attribute in the Stream to write the header value into. + * + * @param msg The current JMS message. + * @param outTuple The output tuple. + * @throws JMSException + */ + private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMSException { + + int messageIDAttrIdx = -1; + int jmsDestinationAttrIdx = -1; + int jmsDeliveryModeAttrIdx = -1; + int jmsExpirationAttrIdx = -1; + int jmsPriorityAttrIdx = -1; + int jmsMessageIDAttrIdx = -1; + int jmsTimestampAttrIdx = -1; + int jmsCorrelationIDAttrIdx = -1; + int jmsReplyToAttrIdx = -1; + int jmsTypeAttrIdx = -1; + int jmsRedeliveredAttrIdx = -1; + + + StreamSchema streamSchema = getOutput(0).getStreamSchema(); + + + if(this.getMessageIDOutAttrName() != null) { + messageIDAttrIdx = streamSchema.getAttributeIndex(this.getMessageIDOutAttrName()); + } + if(messageIDAttrIdx != -1 && msg.getJMSMessageID() != null) { + outTuple.setObject(messageIDAttrIdx, new RString(msg.getJMSMessageID())); + } + + + if(this.getJmsDestinationOutAttrName() != null) { + jmsDestinationAttrIdx = streamSchema.getAttributeIndex(this.getJmsDestinationOutAttrName()); + } + if(jmsDestinationAttrIdx != -1 && msg.getJMSDestination() != null) { + outTuple.setObject(jmsDestinationAttrIdx, new RString(msg.getJMSDestination().toString())); + } + + if(this.getJmsDeliveryModeOutAttrName() != null) { + jmsDeliveryModeAttrIdx = streamSchema.getAttributeIndex(this.getJmsDeliveryModeOutAttrName()); + } + if(jmsDeliveryModeAttrIdx != -1) { + outTuple.setObject(jmsDeliveryModeAttrIdx, new Integer(msg.getJMSDeliveryMode())); + } + + if(this.getJmsExpirationOutAttrName() != null) { + jmsExpirationAttrIdx = streamSchema.getAttributeIndex(this.getJmsExpirationOutAttrName()); + } + if(jmsExpirationAttrIdx != -1) { + outTuple.setObject(jmsExpirationAttrIdx, new Long(msg.getJMSExpiration())); + } + + if(this.getJmsPriorityOutAttrName() != null) { + jmsPriorityAttrIdx = streamSchema.getAttributeIndex(this.getJmsPriorityOutAttrName()); + } + if(jmsPriorityAttrIdx != -1) { + outTuple.setObject(jmsPriorityAttrIdx, new Integer(msg.getJMSPriority())); + } + + if(this.getJmsMessageIDOutAttrName() != null) { + jmsMessageIDAttrIdx = streamSchema.getAttributeIndex(this.getJmsMessageIDOutAttrName()); + } + if(jmsMessageIDAttrIdx != -1 && msg.getJMSMessageID() != null) { + outTuple.setObject(jmsMessageIDAttrIdx, new RString(msg.getJMSMessageID())); + } + + if(this.getJmsTimestampOutAttrName() != null) { + jmsTimestampAttrIdx = streamSchema.getAttributeIndex(this.getJmsTimestampOutAttrName()); + } + if(jmsTimestampAttrIdx != -1) { + outTuple.setObject(jmsTimestampAttrIdx, new Long(msg.getJMSTimestamp())); + } + + if(this.getJmsCorrelationIDOutAttrName() != null) { + jmsCorrelationIDAttrIdx = streamSchema.getAttributeIndex(this.getJmsCorrelationIDOutAttrName()); + } + if(jmsCorrelationIDAttrIdx != -1 && msg.getJMSCorrelationID() != null) { + outTuple.setObject(jmsCorrelationIDAttrIdx, new RString(msg.getJMSCorrelationID())); + } + + if(this.getJmsReplyToOutAttrName() != null) { + jmsReplyToAttrIdx = streamSchema.getAttributeIndex(this.getJmsReplyToOutAttrName()); + } + if(jmsReplyToAttrIdx != -1 && msg.getJMSReplyTo() != null) { + outTuple.setObject(jmsReplyToAttrIdx, new RString(msg.getJMSReplyTo().toString())); + } + + if(this.getJmsTypeOutAttrName() != null) { + jmsTypeAttrIdx = streamSchema.getAttributeIndex(this.getJmsTypeOutAttrName()); + } + if(jmsTypeAttrIdx != -1 && msg.getJMSType() != null) { + outTuple.setObject(jmsTypeAttrIdx, new RString(msg.getJMSType())); + } + + if(this.getJmsRedeliveredOutAttrName() != null) { + jmsRedeliveredAttrIdx = streamSchema.getAttributeIndex(this.getJmsRedeliveredOutAttrName()); + } + if(jmsRedeliveredAttrIdx != -1) { + outTuple.setObject(jmsRedeliveredAttrIdx, new Boolean(msg.getJMSRedelivered())); + } + } + + // Send the error message on to the error output port if one is specified private void sendOutputErrorMsg(String errorMessage) { OutputTuple errorTuple = errorOutputPort.newTuple(); diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index aece8b2..536e111 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -684,7 +684,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 5.3.13 + 5.4.0 4.2.0.0 diff --git a/com.ibm.streamsx.messaging/pom-kafka-0.10.xml b/com.ibm.streamsx.messaging/pom-kafka-0.10.xml new file mode 100644 index 0000000..b792408 --- /dev/null +++ b/com.ibm.streamsx.messaging/pom-kafka-0.10.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + com.ibm.streamsx.messaging + streamsx.messaging + jar + 5.3.13 + com.ibm.streamsx.messaging + + + apache.snapshots + http://repository.apache.org/content/repositories/snapshots/ + + + apache.releases + https://repository.apache.org/content/repositories/releases/ + + + Eclipse Paho Repo releases + https://repo.eclipse.org/content/repositories/paho-releases + + + + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + 1.1.1 + + + org.apache.kafka + kafka-clients + 0.10.2.2 + + + com.rabbitmq + amqp-client + 5.4.3 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.0.1 + + + + . + + + + diff --git a/com.ibm.streamsx.messaging/pom.xml b/com.ibm.streamsx.messaging/pom.xml index b792408..a309b3a 100644 --- a/com.ibm.streamsx.messaging/pom.xml +++ b/com.ibm.streamsx.messaging/pom.xml @@ -6,7 +6,7 @@ com.ibm.streamsx.messaging streamsx.messaging jar - 5.3.13 + 5.4.0 com.ibm.streamsx.messaging