diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml
new file mode 100644
index 0000000000..44ae40e081
--- /dev/null
+++ b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.4"
+services:
+ tubemq:
+ image: inlong/tubemq-all:1.5.0
+ ports:
+ - "8715:8715"
+ - "8123:8123"
+ # web
+ - "8080:8080"
+ - "8081:8081"
diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.yml b/installer/cli/deploy/standalone/tubemq/docker-compose.yml
new file mode 100644
index 0000000000..af677fd5cb
--- /dev/null
+++ b/installer/cli/deploy/standalone/tubemq/docker-compose.yml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.4"
+services:
+ tubemq:
+ image: inlong/tubemq-all:1.5.0
+ volumes:
+ - tubemq:/tubemq
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "1m"
+ max-file: "1"
+ networks:
+ spnet:
+
+volumes:
+ tubemq:
+
+networks:
+ spnet:
+ external: true
diff --git a/pom.xml b/pom.xml
index 045286e085..bbca0d669f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,6 +183,7 @@
1.26
2.12.2
1.4.3-1
+ 1.5.0
3.2.4
3.1.1
@@ -1358,6 +1359,11 @@
quartz
${quartz.version}
+
+ org.apache.inlong
+ tubemq-client
+ ${inlong.version}
+
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
index eee27f7651..fa3610ee52 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
@@ -147,6 +147,10 @@
org.apache.rocketmq
rocketmq-client-java
+
+ org.apache.inlong
+ tubemq-client
+
org.bouncycastle
bcprov-jdk14
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index b709eaf8dc..cbac804a7b 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -33,6 +33,7 @@
import org.apache.streampipes.connect.iiot.protocol.stream.KafkaProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol;
+import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
@@ -67,6 +68,7 @@ public SpServiceDefinition provideServiceDefinition() {
.registerAdapter(new PulsarProtocol())
.registerAdapter(new RocketMQProtocol())
.registerAdapter(new HttpServerProtocol())
+ .registerAdapter(new TubeMQProtocol())
.build();
}
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
new file mode 100644
index 0000000000..7a042de33f
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.protocol.stream;
+
+import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
+import org.apache.streampipes.extensions.api.connect.IFormat;
+import org.apache.streampipes.extensions.api.connect.IParser;
+import org.apache.streampipes.extensions.api.connect.IProtocol;
+import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
+import org.apache.streampipes.extensions.api.connect.exception.ParseException;
+import org.apache.streampipes.extensions.management.connect.SendToPipeline;
+import org.apache.streampipes.extensions.management.connect.adapter.sdk.ParameterExtractor;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
+import org.apache.streampipes.sdk.helpers.AdapterSourceType;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.inlong.tubemq.client.common.PeerInfo;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.MessageListener;
+import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+public class TubeMQProtocol extends BrokerProtocol {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TubeMQProtocol.class);
+
+ public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.tubemq";
+
+ public static final String TOPIC_KEY = "tubemq-topic";
+ public static final String MASTER_HOST_AND_PORT_KEY = "tubemq-master-host-and-port";
+ public static final String CONSUMER_GROUP_KEY = "tubemq-consumer-group";
+
+ private String consumerGroup;
+
+ private MessageSessionFactory messageSessionFactory;
+ private PushMessageConsumer pushConsumer;
+
+ public TubeMQProtocol() {
+ }
+
+ private TubeMQProtocol(IParser parser, IFormat format, String masterHostAndPort, String topic, String consumerGroup) {
+ super(parser, format, masterHostAndPort, topic);
+ this.consumerGroup = consumerGroup;
+ }
+
+ @Override
+ public IProtocol getInstance(ProtocolDescription protocolDescription, IParser parser, IFormat format) {
+ final ParameterExtractor extractor = new ParameterExtractor(protocolDescription.getConfig());
+
+ final String masterHostAndPort = extractor.singleValue(MASTER_HOST_AND_PORT_KEY, String.class);
+ final String topic = extractor.singleValue(TOPIC_KEY, String.class);
+ final String consumerGroup = extractor.singleValue(CONSUMER_GROUP_KEY, String.class);
+
+ return new TubeMQProtocol(parser, format, masterHostAndPort, topic, consumerGroup);
+ }
+
+ @Override
+ public ProtocolDescription declareModel() {
+ return ProtocolDescriptionBuilder.create(ID).withAssets(Assets.DOCUMENTATION, Assets.ICON).withLocales(Locales.EN)
+ .category(AdapterType.Generic).sourceType(AdapterSourceType.STREAM)
+ .requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
+ .requiredTextParameter(Labels.withId(CONSUMER_GROUP_KEY)).build();
+ }
+
+ @Override
+ public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
+ final SendToPipeline sendToPipeline = new SendToPipeline(format, adapterPipeline);
+
+ final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl, consumerGroup);
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+
+ try {
+ messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+
+ pushConsumer.subscribe(topic, null, new MessageListener() {
+ @Override
+ public void receiveMessages(PeerInfo peerInfo, List messages) {
+ for (final Message message : messages) {
+ try {
+ parser.parse(IOUtils.toInputStream(new String(message.getData()), "UTF-8"), sendToPipeline);
+ } catch (ParseException e) {
+ LOGGER.error("Error while parsing: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ });
+ pushConsumer.completeSubscribe();
+ } catch (TubeClientException e) {
+ shutdown(messageSessionFactory, pushConsumer);
+ throw new AdapterException("Failed to create TubeMQ adapter.", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ shutdown(messageSessionFactory, pushConsumer);
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ protected List getNByteElements(int n) throws ParseException {
+ final List elements = new ArrayList<>();
+
+ final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl, consumerGroup);
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+
+ MessageSessionFactory messageSessionFactory = null;
+ PushMessageConsumer pushConsumer = null;
+ try {
+ messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(n);
+ pushConsumer.subscribe(topic, null, new MessageListener() {
+ @Override
+ public void receiveMessages(PeerInfo peerInfo, List messages) {
+ for (final Message message : messages) {
+ if (countDownLatch.getCount() == 0) {
+ return;
+ }
+ elements.add(message.getData());
+ countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ });
+ pushConsumer.completeSubscribe();
+ countDownLatch.await();
+ } catch (TubeClientException | InterruptedException e) {
+ throw new ParseException("Failed to getNByteElements", e);
+ } finally {
+ shutdown(messageSessionFactory, pushConsumer);
+ }
+
+ return elements;
+ }
+
+ private static void shutdown(MessageSessionFactory messageSessionFactory, PushMessageConsumer pushConsumer) {
+ if (pushConsumer != null && !pushConsumer.isShutdown()) {
+ try {
+ pushConsumer.shutdown();
+ } catch (Throwable ex) {
+ LOGGER.error("Failed to stop pushConsumer when TubeClientException occurred.");
+ }
+ }
+
+ if (messageSessionFactory != null) {
+ try {
+ messageSessionFactory.shutdown();
+ } catch (TubeClientException ex) {
+ LOGGER.error("Failed to stop messageSessionFactory when TubeClientException occurred.");
+ }
+ }
+ }
+}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
new file mode 100644
index 0000000000..dbc91d9c26
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
@@ -0,0 +1,48 @@
+
+
+## Apache TubeMQ (InLong)
+
+
+
+
+
+***
+
+## Description
+
+Consumes messages from an Apache TubeMQ broker.
+
+***
+
+## Configuration
+
+### TubeMQ Master Information
+
+This field describes the endpoints of all the TubeMQ masters.
+
+The format should be like `ip1:port1,ip2:port2,ip3:port3`.
+
+### TubeMQ Topic
+
+The topic where events should be sent to.
+
+### TubeMQ Consumer Group
+
+The consumer group of the TubeMQ Consumer.
+
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png
new file mode 100644
index 0000000000..7c0fde0bc4
Binary files /dev/null and b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png differ
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
new file mode 100644
index 0000000000..d8eadb8d06
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+org.apache.streampipes.connect.iiot.protocol.stream.tubemq.title=Apache TubeMQ (InLong)
+org.apache.streampipes.connect.iiot.protocol.stream.tubemq.description=Consumes messages from an Apache TubeMQ (InLong) broker.
+
+tubemq-master-host-and-port.title=TubeMQ Master Information
+tubemq-master-host-and-port.description=This field describes the endpoints of all the TubeMQ masters. The format should be like "ip1:port1,ip2:port2,ip3:port3".
+
+tubemq-consumer-group.title=TubeMQ Consumer Group
+tubemq-consumer-group.description=The consumer group of the TubeMQ Consumer
+
+tubemq-topic.title=TubeMQ Topic
+tubemq-topic.description=Select a TubeMQ topic
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
index fb63ad3c01..cf3ce5d560 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
@@ -88,6 +88,10 @@
jnats
${nats.version}
+
+ org.apache.inlong
+ tubemq-client
+
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index a6ccb66f42..e242e18e4b 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -37,6 +37,7 @@
import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
import org.apache.streampipes.sinks.brokers.jvm.rest.RestController;
import org.apache.streampipes.sinks.brokers.jvm.rocketmq.RocketMQPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
public class BrokersJvmInit extends ExtensionsModelSubmitter {
@@ -61,6 +62,7 @@ public SpServiceDefinition provideServiceDefinition() {
new WebsocketServerSink(),
new PulsarPublisherSink(),
new RocketMQPublisherSink(),
+ new TubeMQPublisherSink(),
new NatsController())
.registerMessagingFormats(
new JsonDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
new file mode 100644
index 0000000000..9a81f8b51b
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.tubemq;
+
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+
+import static org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.MASTER_HOST_AND_PORT_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.TOPIC_KEY;
+
+public class TubeMQParameters {
+
+ private final String masterHostAndPort;
+ private final String topic;
+
+ public TubeMQParameters(SinkParams parameters) {
+ DataSinkParameterExtractor extractor = parameters.extractor();
+
+ this.masterHostAndPort = extractor.singleValueParameter(MASTER_HOST_AND_PORT_KEY, String.class);
+ this.topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+ }
+
+ public String getMasterHostAndPort() {
+ return masterHostAndPort;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
new file mode 100644
index 0000000000..deeb2dcc3e
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.tubemq;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import java.util.Map;
+
+public class TubeMQPublisherSink extends StreamPipesDataSink {
+
+ public static final String MASTER_HOST_AND_PORT_KEY = "tubemq-master-host-and-port";
+ public static final String TOPIC_KEY = "tubemq-topic";
+
+ private SpDataFormatDefinition spDataFormatDefinition;
+ private String topic;
+
+ private MessageProducer messageProducer;
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.tubemq").category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN).withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build())
+ .requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ final TubeMQParameters tubeMQParameters = new TubeMQParameters(sinkParams);
+
+ spDataFormatDefinition = new JsonDataFormatDefinition();
+ topic = tubeMQParameters.getTopic();
+
+ final TubeClientConfig tubeClientConfig = new TubeClientConfig(tubeMQParameters.getMasterHostAndPort());
+ try {
+ messageProducer = new TubeSingleSessionFactory(tubeClientConfig).createProducer();
+ messageProducer.publish(topic);
+ } catch (TubeClientException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ final Map eventRawMap = event.getRaw();
+ final byte[] eventMessage = spDataFormatDefinition.fromMap(eventRawMap);
+ final Message tubemqMessage = new Message(topic, eventMessage);
+
+ try {
+ final MessageSentResult result = messageProducer.sendMessage(tubemqMessage);
+ if (!result.isSuccess()) {
+ throw new SpRuntimeException(
+ String.format("Failed to send message: %s, because: %s", tubemqMessage, result.getErrMsg()));
+ }
+ } catch (TubeClientException | InterruptedException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ try {
+ messageProducer.shutdown();
+ } catch (Throwable e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
new file mode 100644
index 0000000000..6971c693d6
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
@@ -0,0 +1,55 @@
+
+
+## TubeMQ (InLong) Publisher
+
+
+
+
+
+***
+
+## Description
+
+Publishes events to Apache TubeMQ (InLong).
+
+***
+
+## Required Inputs
+
+This sink does not have any requirements and works with any incoming event type.
+
+***
+
+## Configuration
+
+### TubeMQ Master Information
+
+This field describes the endpoints of all the TubeMQ masters.
+
+The format should be like `ip1:port1,ip2:port2,ip3:port3`.
+
+
+### TubeMQ Topic
+
+The topic where events should be sent to.
+
+
+## Output
+
+(not applicable for data sinks)
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png
new file mode 100644
index 0000000000..7c0fde0bc4
Binary files /dev/null and b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png differ
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
new file mode 100644
index 0000000000..9ac5b877ba
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampipes.sinks.brokers.jvm.tubemq.title=TubeMQ (InLong) Publisher
+org.apache.streampipes.sinks.brokers.jvm.tubemq.description=Publish events to Apache TubeMQ (InLong)
+
+tubemq-master-host-and-port.title=TubeMQ Master Information
+tubemq-master-host-and-port.description=This field describes the endpoints of all the TubeMQ masters. The format should be like "ip1:port1,ip2:port2,ip3:port3".
+
+tubemq-topic.title=TubeMQ Topic
+tubemq-topic.description=Select a TubeMQ topic