diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml new file mode 100644 index 0000000000..44ae40e081 --- /dev/null +++ b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3.4" +services: + tubemq: + image: inlong/tubemq-all:1.5.0 + ports: + - "8715:8715" + - "8123:8123" + # web + - "8080:8080" + - "8081:8081" diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.yml b/installer/cli/deploy/standalone/tubemq/docker-compose.yml new file mode 100644 index 0000000000..af677fd5cb --- /dev/null +++ b/installer/cli/deploy/standalone/tubemq/docker-compose.yml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3.4" +services: + tubemq: + image: inlong/tubemq-all:1.5.0 + volumes: + - tubemq:/tubemq + logging: + driver: "json-file" + options: + max-size: "1m" + max-file: "1" + networks: + spnet: + +volumes: + tubemq: + +networks: + spnet: + external: true diff --git a/pom.xml b/pom.xml index 045286e085..bbca0d669f 100644 --- a/pom.xml +++ b/pom.xml @@ -183,6 +183,7 @@ 1.26 2.12.2 1.4.3-1 + 1.5.0 3.2.4 3.1.1 @@ -1358,6 +1359,11 @@ quartz ${quartz.version} + + org.apache.inlong + tubemq-client + ${inlong.version} + diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml index eee27f7651..fa3610ee52 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml @@ -147,6 +147,10 @@ org.apache.rocketmq rocketmq-client-java + + org.apache.inlong + tubemq-client + org.bouncycastle bcprov-jdk14 diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java index b709eaf8dc..cbac804a7b 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java @@ -33,6 +33,7 @@ import org.apache.streampipes.connect.iiot.protocol.stream.KafkaProtocol; import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol; import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol; +import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol; import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol; import org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; @@ -67,6 +68,7 @@ public SpServiceDefinition provideServiceDefinition() { .registerAdapter(new PulsarProtocol()) .registerAdapter(new RocketMQProtocol()) .registerAdapter(new HttpServerProtocol()) + .registerAdapter(new TubeMQProtocol()) .build(); } } diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java new file mode 100644 index 0000000000..7a042de33f --- /dev/null +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.connect.iiot.protocol.stream; + +import org.apache.streampipes.extensions.api.connect.IAdapterPipeline; +import org.apache.streampipes.extensions.api.connect.IFormat; +import org.apache.streampipes.extensions.api.connect.IParser; +import org.apache.streampipes.extensions.api.connect.IProtocol; +import org.apache.streampipes.extensions.api.connect.exception.AdapterException; +import org.apache.streampipes.extensions.api.connect.exception.ParseException; +import org.apache.streampipes.extensions.management.connect.SendToPipeline; +import org.apache.streampipes.extensions.management.connect.adapter.sdk.ParameterExtractor; +import org.apache.streampipes.model.AdapterType; +import org.apache.streampipes.model.connect.grounding.ProtocolDescription; +import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder; +import org.apache.streampipes.sdk.helpers.AdapterSourceType; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.utils.Assets; + +import org.apache.commons.io.IOUtils; +import org.apache.inlong.tubemq.client.common.PeerInfo; +import org.apache.inlong.tubemq.client.config.ConsumerConfig; +import org.apache.inlong.tubemq.client.consumer.ConsumePosition; +import org.apache.inlong.tubemq.client.consumer.MessageListener; +import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer; +import org.apache.inlong.tubemq.client.exception.TubeClientException; +import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; +import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; +import org.apache.inlong.tubemq.corebase.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; + +public class TubeMQProtocol extends BrokerProtocol { + + private static final Logger LOGGER = LoggerFactory.getLogger(TubeMQProtocol.class); + + public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.tubemq"; + + public static final String TOPIC_KEY = "tubemq-topic"; + public static final String MASTER_HOST_AND_PORT_KEY = "tubemq-master-host-and-port"; + public static final String CONSUMER_GROUP_KEY = "tubemq-consumer-group"; + + private String consumerGroup; + + private MessageSessionFactory messageSessionFactory; + private PushMessageConsumer pushConsumer; + + public TubeMQProtocol() { + } + + private TubeMQProtocol(IParser parser, IFormat format, String masterHostAndPort, String topic, String consumerGroup) { + super(parser, format, masterHostAndPort, topic); + this.consumerGroup = consumerGroup; + } + + @Override + public IProtocol getInstance(ProtocolDescription protocolDescription, IParser parser, IFormat format) { + final ParameterExtractor extractor = new ParameterExtractor(protocolDescription.getConfig()); + + final String masterHostAndPort = extractor.singleValue(MASTER_HOST_AND_PORT_KEY, String.class); + final String topic = extractor.singleValue(TOPIC_KEY, String.class); + final String consumerGroup = extractor.singleValue(CONSUMER_GROUP_KEY, String.class); + + return new TubeMQProtocol(parser, format, masterHostAndPort, topic, consumerGroup); + } + + @Override + public ProtocolDescription declareModel() { + return ProtocolDescriptionBuilder.create(ID).withAssets(Assets.DOCUMENTATION, Assets.ICON).withLocales(Locales.EN) + .category(AdapterType.Generic).sourceType(AdapterSourceType.STREAM) + .requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY)) + .requiredTextParameter(Labels.withId(CONSUMER_GROUP_KEY)).build(); + } + + @Override + public void run(IAdapterPipeline adapterPipeline) throws AdapterException { + final SendToPipeline sendToPipeline = new SendToPipeline(format, adapterPipeline); + + final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl, consumerGroup); + consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); + + try { + messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); + pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig); + + pushConsumer.subscribe(topic, null, new MessageListener() { + @Override + public void receiveMessages(PeerInfo peerInfo, List messages) { + for (final Message message : messages) { + try { + parser.parse(IOUtils.toInputStream(new String(message.getData()), "UTF-8"), sendToPipeline); + } catch (ParseException e) { + LOGGER.error("Error while parsing: " + e.getMessage()); + e.printStackTrace(); + } + } + } + + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void stop() { + } + }); + pushConsumer.completeSubscribe(); + } catch (TubeClientException e) { + shutdown(messageSessionFactory, pushConsumer); + throw new AdapterException("Failed to create TubeMQ adapter.", e); + } + } + + @Override + public void stop() { + shutdown(messageSessionFactory, pushConsumer); + } + + @Override + public String getId() { + return ID; + } + + @Override + protected List getNByteElements(int n) throws ParseException { + final List elements = new ArrayList<>(); + + final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl, consumerGroup); + consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET); + + MessageSessionFactory messageSessionFactory = null; + PushMessageConsumer pushConsumer = null; + try { + messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); + pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig); + + final CountDownLatch countDownLatch = new CountDownLatch(n); + pushConsumer.subscribe(topic, null, new MessageListener() { + @Override + public void receiveMessages(PeerInfo peerInfo, List messages) { + for (final Message message : messages) { + if (countDownLatch.getCount() == 0) { + return; + } + elements.add(message.getData()); + countDownLatch.countDown(); + } + } + + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void stop() { + } + }); + pushConsumer.completeSubscribe(); + countDownLatch.await(); + } catch (TubeClientException | InterruptedException e) { + throw new ParseException("Failed to getNByteElements", e); + } finally { + shutdown(messageSessionFactory, pushConsumer); + } + + return elements; + } + + private static void shutdown(MessageSessionFactory messageSessionFactory, PushMessageConsumer pushConsumer) { + if (pushConsumer != null && !pushConsumer.isShutdown()) { + try { + pushConsumer.shutdown(); + } catch (Throwable ex) { + LOGGER.error("Failed to stop pushConsumer when TubeClientException occurred."); + } + } + + if (messageSessionFactory != null) { + try { + messageSessionFactory.shutdown(); + } catch (TubeClientException ex) { + LOGGER.error("Failed to stop messageSessionFactory when TubeClientException occurred."); + } + } + } +} diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md new file mode 100644 index 0000000000..dbc91d9c26 --- /dev/null +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md @@ -0,0 +1,48 @@ + + +## Apache TubeMQ (InLong) + +

+ +

+ +*** + +## Description + +Consumes messages from an Apache TubeMQ broker. + +*** + +## Configuration + +### TubeMQ Master Information + +This field describes the endpoints of all the TubeMQ masters. + +The format should be like `ip1:port1,ip2:port2,ip3:port3`. + +### TubeMQ Topic + +The topic where events should be sent to. + +### TubeMQ Consumer Group + +The consumer group of the TubeMQ Consumer. + diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png new file mode 100644 index 0000000000..7c0fde0bc4 Binary files /dev/null and b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png differ diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en new file mode 100644 index 0000000000..d8eadb8d06 --- /dev/null +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +org.apache.streampipes.connect.iiot.protocol.stream.tubemq.title=Apache TubeMQ (InLong) +org.apache.streampipes.connect.iiot.protocol.stream.tubemq.description=Consumes messages from an Apache TubeMQ (InLong) broker. + +tubemq-master-host-and-port.title=TubeMQ Master Information +tubemq-master-host-and-port.description=This field describes the endpoints of all the TubeMQ masters. The format should be like "ip1:port1,ip2:port2,ip3:port3". + +tubemq-consumer-group.title=TubeMQ Consumer Group +tubemq-consumer-group.description=The consumer group of the TubeMQ Consumer + +tubemq-topic.title=TubeMQ Topic +tubemq-topic.description=Select a TubeMQ topic diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml index fb63ad3c01..cf3ce5d560 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml @@ -88,6 +88,10 @@ jnats ${nats.version}
+ + org.apache.inlong + tubemq-client + diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java index a6ccb66f42..e242e18e4b 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java @@ -37,6 +37,7 @@ import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController; import org.apache.streampipes.sinks.brokers.jvm.rest.RestController; import org.apache.streampipes.sinks.brokers.jvm.rocketmq.RocketMQPublisherSink; +import org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink; import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink; public class BrokersJvmInit extends ExtensionsModelSubmitter { @@ -61,6 +62,7 @@ public SpServiceDefinition provideServiceDefinition() { new WebsocketServerSink(), new PulsarPublisherSink(), new RocketMQPublisherSink(), + new TubeMQPublisherSink(), new NatsController()) .registerMessagingFormats( new JsonDataFormatFactory(), diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java new file mode 100644 index 0000000000..9a81f8b51b --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.brokers.jvm.tubemq; + +import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor; +import org.apache.streampipes.wrapper.standalone.SinkParams; + +import static org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.MASTER_HOST_AND_PORT_KEY; +import static org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.TOPIC_KEY; + +public class TubeMQParameters { + + private final String masterHostAndPort; + private final String topic; + + public TubeMQParameters(SinkParams parameters) { + DataSinkParameterExtractor extractor = parameters.extractor(); + + this.masterHostAndPort = extractor.singleValueParameter(MASTER_HOST_AND_PORT_KEY, String.class); + this.topic = extractor.singleValueParameter(TOPIC_KEY, String.class); + } + + public String getMasterHostAndPort() { + return masterHostAndPort; + } + + public String getTopic() { + return topic; + } +} diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java new file mode 100644 index 0000000000..deeb2dcc3e --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.brokers.jvm.tubemq; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.dataformat.SpDataFormatDefinition; +import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition; +import org.apache.streampipes.model.DataSinkType; +import org.apache.streampipes.model.graph.DataSinkDescription; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.sdk.builder.DataSinkBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.utils.Assets; +import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext; +import org.apache.streampipes.wrapper.standalone.SinkParams; +import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; + +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.client.exception.TubeClientException; +import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; +import org.apache.inlong.tubemq.client.producer.MessageProducer; +import org.apache.inlong.tubemq.client.producer.MessageSentResult; +import org.apache.inlong.tubemq.corebase.Message; + +import java.util.Map; + +public class TubeMQPublisherSink extends StreamPipesDataSink { + + public static final String MASTER_HOST_AND_PORT_KEY = "tubemq-master-host-and-port"; + public static final String TOPIC_KEY = "tubemq-topic"; + + private SpDataFormatDefinition spDataFormatDefinition; + private String topic; + + private MessageProducer messageProducer; + + @Override + public DataSinkDescription declareModel() { + return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.tubemq").category(DataSinkType.MESSAGING) + .withLocales(Locales.EN).withAssets(Assets.DOCUMENTATION, Assets.ICON) + .requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build()) + .requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY)) + .build(); + } + + @Override + public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException { + final TubeMQParameters tubeMQParameters = new TubeMQParameters(sinkParams); + + spDataFormatDefinition = new JsonDataFormatDefinition(); + topic = tubeMQParameters.getTopic(); + + final TubeClientConfig tubeClientConfig = new TubeClientConfig(tubeMQParameters.getMasterHostAndPort()); + try { + messageProducer = new TubeSingleSessionFactory(tubeClientConfig).createProducer(); + messageProducer.publish(topic); + } catch (TubeClientException e) { + throw new SpRuntimeException(e); + } + } + + @Override + public void onEvent(Event event) throws SpRuntimeException { + final Map eventRawMap = event.getRaw(); + final byte[] eventMessage = spDataFormatDefinition.fromMap(eventRawMap); + final Message tubemqMessage = new Message(topic, eventMessage); + + try { + final MessageSentResult result = messageProducer.sendMessage(tubemqMessage); + if (!result.isSuccess()) { + throw new SpRuntimeException( + String.format("Failed to send message: %s, because: %s", tubemqMessage, result.getErrMsg())); + } + } catch (TubeClientException | InterruptedException e) { + throw new SpRuntimeException(e); + } + } + + @Override + public void onDetach() throws SpRuntimeException { + try { + messageProducer.shutdown(); + } catch (Throwable e) { + throw new SpRuntimeException(e); + } + } +} diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md new file mode 100644 index 0000000000..6971c693d6 --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md @@ -0,0 +1,55 @@ + + +## TubeMQ (InLong) Publisher + +

+ +

+ +*** + +## Description + +Publishes events to Apache TubeMQ (InLong). + +*** + +## Required Inputs + +This sink does not have any requirements and works with any incoming event type. + +*** + +## Configuration + +### TubeMQ Master Information + +This field describes the endpoints of all the TubeMQ masters. + +The format should be like `ip1:port1,ip2:port2,ip3:port3`. + + +### TubeMQ Topic + +The topic where events should be sent to. + + +## Output + +(not applicable for data sinks) diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png new file mode 100644 index 0000000000..7c0fde0bc4 Binary files /dev/null and b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png differ diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en new file mode 100644 index 0000000000..9ac5b877ba --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.streampipes.sinks.brokers.jvm.tubemq.title=TubeMQ (InLong) Publisher +org.apache.streampipes.sinks.brokers.jvm.tubemq.description=Publish events to Apache TubeMQ (InLong) + +tubemq-master-host-and-port.title=TubeMQ Master Information +tubemq-master-host-and-port.description=This field describes the endpoints of all the TubeMQ masters. The format should be like "ip1:port1,ip2:port2,ip3:port3". + +tubemq-topic.title=TubeMQ Topic +tubemq-topic.description=Select a TubeMQ topic