From 07a6438de485419320869ea4f801f0b19072b108 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Wed, 22 Jan 2025 13:15:16 +0800 Subject: [PATCH] [INLONG-11702][SDK] Optimize Sender factory implementation --- .../sdk/dataproxy/BaseMsgSenderFactory.java | 267 ++++++++++++++++++ .../sdk/dataproxy/DefaultMessageSender.java | 4 + .../inlong/sdk/dataproxy/MessageSender.java | 1 + .../sdk/dataproxy/MessageSenderFactory.java | 22 -- .../sdk/dataproxy/MsgSenderFactory.java | 91 ++++++ .../sdk/dataproxy/MsgSenderMultiFactory.java | 101 +++++++ .../sdk/dataproxy/MsgSenderSingleFactory.java | 108 +++++++ .../dataproxy/common/ProxyClientConfig.java | 14 +- .../dataproxy/common/SendMessageCallback.java | 5 + .../sdk/dataproxy/example/ExampleUtils.java | 259 +++++++++++++++++ .../example/InLongFactoryExample.java | 127 +++++++++ .../example/InLongTcpClientExample.java | 71 +++++ .../sdk/dataproxy/sender/BaseSender.java | 30 +- .../sender/tcp/InLongTcpMsgSender.java | 63 ++++- .../dataproxy/sender/tcp/TcpMsgSender.java | 73 +++-- .../sdk/dataproxy/utils/ProxyUtils.java | 16 ++ 16 files changed, 1182 insertions(+), 70 deletions(-) create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java delete mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java new file mode 100644 index 00000000000..0bc0cf4bb98 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy; + +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; +import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; +import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Base Message Sender Factory + * + * Used to manage the instance relationship of the sender factory. + * Since both singleton and multiple instances involve the same relationship, + * they are abstracted and maintained separately here. + */ +public class BaseMsgSenderFactory { + + private static final Logger logger = LoggerFactory.getLogger(BaseMsgSenderFactory.class); + private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L); + // msg send factory + private final MsgSenderFactory msgSenderFactory; + private final String factoryNo; + // for senders + private final ReentrantReadWriteLock senderCacheLock = new ReentrantReadWriteLock(); + // for inlong groupId -- Sender map + private final ConcurrentHashMap groupIdSenderMap = new ConcurrentHashMap<>(); + // for inlong clusterId -- Sender map + private final ConcurrentHashMap clusterIdSenderMap = new ConcurrentHashMap<>(); + + public BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String factoryNo) { + this.msgSenderFactory = msgSenderFactory; + this.factoryNo = factoryNo; + logger.info("MsgSenderFactory({}) started", this.factoryNo); + } + + public void close() { + int totalSenderCnt; + int totalTDBankCnt; + senderCacheLock.writeLock().lock(); + try { + // release groupId mapped senders + totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap); + // release clusterId mapped senders + totalSenderCnt += innReleaseAllClusterIdSenders(clusterIdSenderMap); + } finally { + senderCacheLock.writeLock().unlock(); + } + logger.info("MsgSenderFactory({}) closed, release {} inlong senders", + this.factoryNo, totalSenderCnt); + } + + public void removeClient(BaseSender msgSender) { + if (msgSender == null + || msgSender.getSenderFactory() == null + || msgSender.getSenderFactory() != msgSenderFactory) { + return; + } + boolean removed; + String senderId = msgSender.getSenderId(); + senderCacheLock.writeLock().lock(); + try { + if (msgSender.getFactoryClusterIdKey() == null) { + removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap); + } else { + removed = innRemoveClusterIdSender(msgSender, clusterIdSenderMap); + } + } finally { + senderCacheLock.writeLock().unlock(); + } + if (removed) { + logger.info("MsgSenderFactory({}) removed sender({})", this.factoryNo, senderId); + } + } + + public int getMsgSenderCount() { + return groupIdSenderMap.size() + clusterIdSenderMap.size(); + } + + public InLongTcpMsgSender genTcpSenderByGroupId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { + ProxyUtils.validProxyConfigNotNull(configure); + // query cached sender + String metaConfigKey = configure.getGroupMetaConfigKey(); + InLongTcpMsgSender messageSender = + (InLongTcpMsgSender) groupIdSenderMap.get(metaConfigKey); + if (messageSender != null) { + return messageSender; + } + // valid configure info + ProcessResult procResult = new ProcessResult(); + qryProxyMetaConfigure(configure, procResult); + // generate sender + senderCacheLock.writeLock().lock(); + try { + // re-get the created sender based on the groupId key after locked + messageSender = (InLongTcpMsgSender) groupIdSenderMap.get(metaConfigKey); + if (messageSender != null) { + return messageSender; + } + // build a new sender based on groupId + messageSender = new InLongTcpMsgSender(configure, selfDefineFactory, msgSenderFactory, null); + if (!messageSender.start(procResult)) { + messageSender.close(); + throw new ProxySdkException("Failed to start groupId sender: " + procResult); + } + groupIdSenderMap.put(metaConfigKey, messageSender); + logger.info("MsgSenderFactory({}) generated a new groupId({}) sender({})", + this.factoryNo, metaConfigKey, messageSender.getSenderId()); + return messageSender; + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("MsgSenderFactory({}) build groupId sender({}) exception", + this.factoryNo, metaConfigKey, ex); + } + throw new ProxySdkException("Failed to build groupId sender: " + ex.getMessage()); + } finally { + senderCacheLock.writeLock().unlock(); + } + } + + public InLongTcpMsgSender genTcpSenderByClusterId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { + ProxyUtils.validProxyConfigNotNull(configure); + // get groupId's clusterIdKey + ProcessResult procResult = new ProcessResult(); + ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);; + String clusterIdKey = ProxyUtils.buildClusterIdKey( + configure.getDataRptProtocol(), configure.getRegionName(), proxyConfigEntry.getClusterId()); + // get local built sender + InLongTcpMsgSender messageSender = (InLongTcpMsgSender) clusterIdSenderMap.get(clusterIdKey); + if (messageSender != null) { + return messageSender; + } + // generate sender + senderCacheLock.writeLock().lock(); + try { + // re-get the created sender based on the clusterId Key after locked + messageSender = (InLongTcpMsgSender) clusterIdSenderMap.get(clusterIdKey); + if (messageSender != null) { + return messageSender; + } + // build a new sender based on clusterId Key + messageSender = new InLongTcpMsgSender(configure, + selfDefineFactory, msgSenderFactory, clusterIdKey); + if (!messageSender.start(procResult)) { + messageSender.close(); + throw new ProxySdkException("Failed to start cluster sender: " + procResult); + } + clusterIdSenderMap.put(clusterIdKey, messageSender); + logger.info("MsgSenderFactory({}) generated a new clusterId({}) sender({})", + this.factoryNo, clusterIdKey, messageSender.getSenderId()); + return messageSender; + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("MsgSenderFactory({}) build cluster sender({}) exception", + this.factoryNo, clusterIdKey, ex); + } + throw new ProxySdkException("Failed to build cluster sender: " + ex.getMessage()); + } finally { + senderCacheLock.writeLock().unlock(); + } + } + + private ProxyConfigEntry qryProxyMetaConfigure( + ProxyClientConfig proxyConfig, ProcessResult procResult) throws ProxySdkException { + ProxyConfigManager inlongMetaQryMgr = new ProxyConfigManager(proxyConfig); + // check whether valid configure + if (!inlongMetaQryMgr.getGroupIdConfigure(true, procResult)) { + throw new ProxySdkException("Failed to query remote group config: " + procResult); + } + if (proxyConfig.isEnableReportEncrypt() + && !inlongMetaQryMgr.getEncryptConfigure(true, procResult)) { + throw new ProxySdkException("Failed to query remote encrypt config: " + procResult); + } + return inlongMetaQryMgr.getProxyConfigEntry(); + } + + private boolean innRemoveGroupIdSender(BaseSender msgSender, Map senderMap) { + BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey()); + if (tmpSender == null + || !tmpSender.getSenderId().equals(msgSender.getSenderId())) { + return false; + } + return senderMap.remove(msgSender.getMetaConfigKey()) != null; + } + + private boolean innRemoveClusterIdSender(BaseSender msgSender, Map senderMap) { + BaseSender tmpSender = senderMap.get(msgSender.getFactoryClusterIdKey()); + if (tmpSender == null + || !tmpSender.getSenderId().equals(msgSender.getSenderId())) { + return false; + } + return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null; + } + + private int innReleaseAllGroupIdSenders(Map senderMap) { + int totalSenderCnt = 0; + for (Map.Entry entry : senderMap.entrySet()) { + if (entry == null || entry.getValue() == null) { + continue; + } + try { + entry.getValue().close(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("MsgSenderFactory({}) close groupId({})'s sender failure", + this.factoryNo, entry.getKey(), ex); + } + } + totalSenderCnt++; + } + senderMap.clear(); + return totalSenderCnt; + } + + private int innReleaseAllClusterIdSenders(Map senderMap) { + int totalSenderCnt = 0; + for (Map.Entry entry : senderMap.entrySet()) { + if (entry == null + || entry.getKey() == null + || entry.getValue() == null) { + continue; + } + try { + entry.getValue().close(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("MsgSenderFactory({}) close clusterId({})'s sender failure", + this.factoryNo, entry.getKey(), ex); + } + } + totalSenderCnt++; + } + senderMap.clear(); + return totalSenderCnt; + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 52f2116e889..ecdc820a7de 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -44,6 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +@Deprecated +/** + * Replace by InLongTcpMsgSender + */ public class DefaultMessageSender implements MessageSender { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageSender.class); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java index 862586ab77e..26fe170b904 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +@Deprecated public interface MessageSender { void close(); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java deleted file mode 100644 index a70dfa37a0d..00000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy; - -public class MessageSenderFactory { - -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java new file mode 100644 index 00000000000..a6a4e20b4cc --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy; + +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; +import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; + +import java.util.concurrent.ThreadFactory; + +/** + * Message Sender Factory interface + * + * Used to define the sender factory common methods + */ +public interface MsgSenderFactory { + + /** + * Shutdown all senders at the factory + * + */ + void shutdownAll() throws ProxySdkException; + + /** + * Remove the specified sender from the factory + * + * @param msgSender the specified sender + */ + void removeClient(BaseSender msgSender); + + /** + * Remove the sender number int the factory + * + * @return the number of senders currently in use + */ + int getMsgSenderCount(); + + /** + * Get or generate a sender from the factory according to groupId + * + * @param configure the sender configure + * @return the sender + */ + InLongTcpMsgSender genTcpSenderByGroupId( + TcpMsgSenderConfig configure) throws ProxySdkException; + + /** + * Get or generate a sender from the factory according to groupId + * + * @param configure the sender configure + * @param selfDefineFactory the self defined network threads factory + * @return the sender + */ + InLongTcpMsgSender genTcpSenderByGroupId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException; + + /** + * Get or generate a sender from the factory according to clusterId + * + * @param configure the sender configure + * @return the sender + */ + InLongTcpMsgSender genTcpSenderByClusterId( + TcpMsgSenderConfig configure) throws ProxySdkException; + + /** + * Get or generate a sender from the factory according to clusterId + * + * @param configure the sender configure + * @param selfDefineFactory the self defined network threads factory + * @return the sender + */ + InLongTcpMsgSender genTcpSenderByClusterId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException; +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java new file mode 100644 index 00000000000..0cb595c2612 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy; + +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; +import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Multiple Instances Message Sender Factory + * + * Used to define the Multiple instance sender factory + */ +public class MsgSenderMultiFactory implements MsgSenderFactory { + + private static final AtomicLong refCounter = new AtomicLong(0); + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final BaseMsgSenderFactory baseMsgSenderFactory; + + public MsgSenderMultiFactory() { + this.baseMsgSenderFactory = new BaseMsgSenderFactory( + this, "iMultiFact-" + refCounter.incrementAndGet()); + this.initialized.set(true); + } + + @Override + public void shutdownAll() throws ProxySdkException { + if (!this.initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + this.baseMsgSenderFactory.close(); + } + + @Override + public void removeClient(BaseSender msgSender) { + if (msgSender == null + || msgSender.getSenderFactory() == null + || msgSender.getSenderFactory() != this) { + return; + } + this.baseMsgSenderFactory.removeClient(msgSender); + } + + @Override + public int getMsgSenderCount() { + return this.baseMsgSenderFactory.getMsgSenderCount(); + } + + @Override + public InLongTcpMsgSender genTcpSenderByGroupId( + TcpMsgSenderConfig configure) throws ProxySdkException { + return genTcpSenderByGroupId(configure, null); + } + + @Override + public InLongTcpMsgSender genTcpSenderByGroupId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { + if (!this.initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + ProxyUtils.validProxyConfigNotNull(configure); + return this.baseMsgSenderFactory.genTcpSenderByGroupId(configure, selfDefineFactory); + } + + @Override + public InLongTcpMsgSender genTcpSenderByClusterId( + TcpMsgSenderConfig configure) throws ProxySdkException { + return genTcpSenderByClusterId(configure, null); + } + + @Override + public InLongTcpMsgSender genTcpSenderByClusterId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { + if (!this.initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + ProxyUtils.validProxyConfigNotNull(configure); + return this.baseMsgSenderFactory.genTcpSenderByClusterId(configure, selfDefineFactory); + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java new file mode 100644 index 00000000000..91b17351555 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy; + +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; +import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Singleton Message Sender Factory + * + * Used to define the singleton sender factory + */ +public class MsgSenderSingleFactory implements MsgSenderFactory { + + private static final AtomicBoolean initialized = new AtomicBoolean(false); + private static final AtomicLong singletonRefCounter = new AtomicLong(0); + private static BaseMsgSenderFactory baseMsgSenderFactory; + + public MsgSenderSingleFactory() { + if (singletonRefCounter.incrementAndGet() == 1) { + baseMsgSenderFactory = new BaseMsgSenderFactory(this, "iSingleFct"); + initialized.set(true); + } + while (!initialized.get()) { + ProxyUtils.sleepSomeTime(50L); + } + } + + @Override + public void shutdownAll() throws ProxySdkException { + if (!initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + if (singletonRefCounter.decrementAndGet() > 0) { + return; + } + baseMsgSenderFactory.close(); + } + + @Override + public void removeClient(BaseSender msgSender) { + if (msgSender == null + || msgSender.getSenderFactory() == null + || msgSender.getSenderFactory() != this) { + return; + } + baseMsgSenderFactory.removeClient(msgSender); + } + + @Override + public int getMsgSenderCount() { + return baseMsgSenderFactory.getMsgSenderCount(); + } + + @Override + public InLongTcpMsgSender genTcpSenderByGroupId( + TcpMsgSenderConfig configure) throws ProxySdkException { + return genTcpSenderByGroupId(configure, null); + } + + @Override + public InLongTcpMsgSender genTcpSenderByGroupId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { + if (!initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + ProxyUtils.validProxyConfigNotNull(configure); + return baseMsgSenderFactory.genTcpSenderByGroupId(configure, selfDefineFactory); + } + + @Override + public InLongTcpMsgSender genTcpSenderByClusterId( + TcpMsgSenderConfig configure) throws ProxySdkException { + return genTcpSenderByClusterId(configure, null); + } + + @Override + public InLongTcpMsgSender genTcpSenderByClusterId( + TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { + if (!initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + ProxyUtils.validProxyConfigNotNull(configure); + return baseMsgSenderFactory.genTcpSenderByClusterId(configure, selfDefineFactory); + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java index 48039c7c65e..274b24f16d2 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java @@ -19,6 +19,7 @@ import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.metric.MetricConfig; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -112,7 +113,8 @@ protected ProxyClientConfig(boolean visitMgrByHttps, String managerIP, int manag this.inlongGroupId = groupId.trim(); this.dataRptProtocol = rptProtocol.name(); this.setRegionName(regionName); - this.groupMetaConfigKey = this.buildMgrConfigKey(); + this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey( + this.dataRptProtocol, this.regionName, this.inlongGroupId); } protected ProxyClientConfig(String managerAddress, @@ -124,7 +126,8 @@ protected ProxyClientConfig(String managerAddress, this.inlongGroupId = groupId.trim(); this.dataRptProtocol = rptProtocol.name(); this.setRegionName(regionName); - this.groupMetaConfigKey = this.buildMgrConfigKey(); + this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey( + this.dataRptProtocol, this.regionName, this.inlongGroupId); } public void setMgrAuthzInfo(boolean needMgrAuthz, @@ -226,7 +229,8 @@ public String getGroupMetaConfigKey() { public void setRegionName(String regionName) { if (StringUtils.isNotBlank(regionName)) { this.regionName = regionName.trim().toLowerCase(); - this.groupMetaConfigKey = this.buildMgrConfigKey(); + this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey( + this.dataRptProtocol, this.regionName, this.inlongGroupId); } } @@ -520,8 +524,4 @@ private void checkAndParseAddress(String managerAddress) throws ProxySdkExceptio } this.managerPort = tmValue; } - - private String buildMgrConfigKey() { - return this.inlongGroupId + ":" + this.regionName + ":" + this.dataRptProtocol; - } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java index 9e83f4673c4..48ce6070376 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java @@ -17,6 +17,11 @@ package org.apache.inlong.sdk.dataproxy.common; +@Deprecated +/** + * Replace by MsgSendCallback + * + */ public interface SendMessageCallback { /* Invoked when a message is confirmed by TDBus. */ diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java new file mode 100644 index 00000000000..b271aeccf18 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.example; + +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender; + +import org.apache.commons.codec.binary.StringUtils; + +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ExampleUtils { + + private static final SecureRandom cntRandom = new SecureRandom( + Long.toString(System.nanoTime()).getBytes()); + + public static void sendTcpMessages(TcpMsgSender msgSender, boolean isSync, boolean isMultiItem, + String groupId, String streamId, int reqCnt, int baseBodyLen, int msgItemCnt, ProcessResult procResult) { + int sucCnt = 0; + int curCount = 0; + TcpEventInfo eventInfo; + byte[] itemBody = buildBoydData(baseBodyLen); + List multiBodys = new ArrayList<>(); + for (int i = 0; i < msgItemCnt; i++) { + multiBodys.add(itemBody); + } + Map localAttrs = new HashMap<>(); + if (isSync) { + if (isMultiItem) { + // send single message + while (curCount++ < reqCnt) { + try { + if (curCount > 1) { + localAttrs.clear(); + localAttrs.put("index", String.valueOf(curCount)); + } + eventInfo = new TcpEventInfo(groupId, streamId, + System.currentTimeMillis(), localAttrs, multiBodys); + } catch (Throwable ex) { + System.out.println("Build tcp event failure, ex=" + ex); + continue; + } + if (!msgSender.sendMessage(eventInfo, procResult)) { + System.out.println("Sync request index=" + curCount + ", process result=" + procResult); + continue; + } + curCount++; + } + } else { + // send single message + while (curCount++ < reqCnt) { + try { + if (curCount > 1) { + localAttrs.clear(); + localAttrs.put("index", String.valueOf(curCount)); + localAttrs.put("multi", String.valueOf(false)); + } + eventInfo = new TcpEventInfo(groupId, streamId, + System.currentTimeMillis(), localAttrs, itemBody); + } catch (Throwable ex) { + System.out.println("Build tcp event failure, ex=" + ex); + continue; + } + if (!msgSender.sendMessage(eventInfo, procResult)) { + System.out.println("Sync request index=" + curCount + ", process result=" + procResult); + continue; + } + curCount++; + } + } + } else { + if (isMultiItem) { + // send multiple message + while (curCount++ < reqCnt) { + try { + if (curCount > 1) { + localAttrs.clear(); + localAttrs.put("index", String.valueOf(curCount)); + localAttrs.put("multi", String.valueOf(true)); + } + eventInfo = new TcpEventInfo(groupId, streamId, + System.currentTimeMillis(), localAttrs, multiBodys); + } catch (Throwable ex) { + System.out.println("Build multiple tcp event failure, ex=" + ex); + continue; + } + if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) { + System.out.println("Async request index=" + curCount + ", post result=" + procResult); + continue; + } + curCount++; + } + } else { + // send single message + while (curCount++ < reqCnt) { + try { + eventInfo = new TcpEventInfo(groupId, streamId, + System.currentTimeMillis(), null, itemBody); + } catch (Throwable ex) { + System.out.println("Build tcp event failure, ex=" + ex); + continue; + } + if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) { + System.out.println("Async request index=" + curCount + ", post result=" + procResult); + continue; + } + curCount++; + } + } + } + } + + public static void sendHttpMessages(HttpMsgSender msgSender, boolean isSync, boolean isMultiItem, + String groupId, String streamId, int reqCnt, int baseBodyLen, int msgItemCnt, ProcessResult procResult) { + int sucCnt = 0; + int curCount = 0; + HttpEventInfo eventInfo; + String itemBody = getRandomString(baseBodyLen); + List multiBodys = new ArrayList<>(); + for (int i = 0; i < msgItemCnt; i++) { + multiBodys.add(itemBody); + } + if (isSync) { + if (isMultiItem) { + // send multiple message + while (curCount++ < reqCnt) { + try { + eventInfo = new HttpEventInfo(groupId, streamId, + System.currentTimeMillis(), multiBodys); + } catch (Throwable ex) { + System.out.println("Build multiple http event failure, ex=" + ex); + continue; + } + if (!msgSender.syncSendMessage(eventInfo, procResult)) { + System.out.println("Sync request index=" + curCount + ", process result=" + procResult); + continue; + } + curCount++; + } + } else { + // send single message + while (curCount++ < reqCnt) { + try { + eventInfo = new HttpEventInfo(groupId, streamId, + System.currentTimeMillis(), itemBody); + } catch (Throwable ex) { + System.out.println("Build single http event failure, ex=" + ex); + continue; + } + if (!msgSender.syncSendMessage(eventInfo, procResult)) { + System.out.println("Sync request index=" + curCount + ", process result=" + procResult); + continue; + } + curCount++; + } + } + } else { + if (isMultiItem) { + // send multiple message + while (curCount++ < reqCnt) { + try { + eventInfo = new HttpEventInfo(groupId, streamId, + System.currentTimeMillis(), multiBodys); + } catch (Throwable ex) { + System.out.println("Build multiple http event failure, ex=" + ex); + continue; + } + if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) { + System.out.println("Async request index=" + curCount + ", post result=" + procResult); + continue; + } + curCount++; + } + } else { + // send single message + while (curCount++ < reqCnt) { + try { + eventInfo = new HttpEventInfo(groupId, streamId, System.currentTimeMillis(), itemBody); + } catch (Throwable ex) { + System.out.println("Build single http event failure, ex=" + ex); + continue; + } + if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) { + System.out.println("Async request: index=" + curCount + ", post result=" + procResult); + continue; + } + curCount++; + } + } + } + } + + private static String getRandomString(int length) { + String strBase = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) { + int number = cntRandom.nextInt(strBase.length()); + sb.append(strBase.charAt(number)); + } + return sb.toString(); + } + + private static byte[] buildBoydData(int bodySize) { + final byte[] itemBaseData = + StringUtils.getBytesUtf8("inglong tcp test data!"); + final ByteBuffer dataBuffer = ByteBuffer.allocate(bodySize); + while (dataBuffer.hasRemaining()) { + int offset = dataBuffer.arrayOffset(); + dataBuffer.put(itemBaseData, offset, + Math.min(dataBuffer.remaining(), itemBaseData.length)); + } + dataBuffer.flip(); + return dataBuffer.array(); + } + + private static class MyMsgSendBack implements MsgSendCallback { + + private final int msgId; + + public MyMsgSendBack(int msgId) { + this.msgId = msgId; + } + + @Override + public void onMessageAck(ProcessResult result) { + // System.out.println("msgId=" + msgId + ", send result = " + result); + } + + @Override + public void onException(Throwable ex) { + System.out.println("msgId=" + msgId + ", send exception=" + ex); + + } + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java new file mode 100644 index 00000000000..d1c8b8e19c8 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.example; + +import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.MsgSenderMultiFactory; +import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import io.netty.util.concurrent.DefaultThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; + +public class InLongFactoryExample { + + protected static final Logger logger = LoggerFactory.getLogger(InLongFactoryExample.class); + + public static void main(String[] args) throws Exception { + + String managerIp = args[0]; + int managerPort = Integer.parseInt(args[1]); + String groupId = args[2]; + String streamId = args[3]; + String secretId = args[4]; + String secretKey = args[5]; + int reqCnt = Integer.parseInt(args[6]); + int msgSize = 1024; + int msgCnt = 1; + if (args.length > 7) { + msgSize = Integer.parseInt(args[7]); + msgCnt = Integer.parseInt(args[8]); + } + + System.out.println("InLongFactoryExample start"); + + // build singleton factory + MsgSenderSingleFactory singleFactory = new MsgSenderSingleFactory(); + // report data by tcp + TcpMsgSenderConfig tcpMsgSenderConfig = new TcpMsgSenderConfig( + false, managerIp, managerPort, groupId, secretId, secretKey); + tcpMsgSenderConfig.setRequestTimeoutMs(20000L); + InLongTcpMsgSender tcpMsgSender = + singleFactory.genTcpSenderByClusterId(tcpMsgSenderConfig); + ProcessResult procResult = new ProcessResult(); + if (!tcpMsgSender.start(procResult)) { + System.out.println("Start tcp sender failure: process result=" + procResult); + } + + // report data + ExampleUtils.sendTcpMessages(tcpMsgSender, false, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendTcpMessages(tcpMsgSender, false, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ProxyUtils.sleepSomeTime(10000L); + tcpMsgSender.close(); + + // report data use multi-factory + MsgSenderMultiFactory multiFactory1 = new MsgSenderMultiFactory(); + MsgSenderMultiFactory multiFactory2 = new MsgSenderMultiFactory(); + // report data by tcp + tcpMsgSenderConfig.setSdkMsgType(MsgType.MSG_ACK_SERVICE); + InLongTcpMsgSender tcpMsgSender1 = + multiFactory1.genTcpSenderByGroupId(tcpMsgSenderConfig); + if (!tcpMsgSender1.start(procResult)) { + System.out.println("Start tcp sender1 failure: process result=" + procResult); + } + + String managerAddr = "http://" + managerIp + ":" + managerPort; + TcpMsgSenderConfig tcpMsgSenderConfig2 = + new TcpMsgSenderConfig(managerAddr, groupId, secretId, secretKey); + tcpMsgSenderConfig2.setSdkMsgType(MsgType.MSG_MULTI_BODY); + InLongTcpMsgSender tcpMsgSender2 = + multiFactory2.genTcpSenderByGroupId(tcpMsgSenderConfig2); + ExampleUtils.sendTcpMessages(tcpMsgSender1, false, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendTcpMessages(tcpMsgSender2, false, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ProxyUtils.sleepSomeTime(10000L); + tcpMsgSender1.close(); + System.out.println("Multi-1.1 Cur multiFactory1 sender count = " + + multiFactory1.getMsgSenderCount() + + ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount()); + tcpMsgSender2.close(); + System.out.println("Multi-1.2 Cur multiFactory1 sender count = " + + multiFactory1.getMsgSenderCount() + + ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount()); + + // test self DefineFactory + ThreadFactory selfDefineFactory = new DefaultThreadFactory("test_self_thread_factory"); + InLongTcpMsgSender tcpMsgSelfSender = + singleFactory.genTcpSenderByGroupId(tcpMsgSenderConfig, selfDefineFactory); + ExampleUtils.sendTcpMessages(tcpMsgSelfSender, false, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ProxyUtils.sleepSomeTime(10000L); + + tcpMsgSelfSender.close(); + + System.out.println("singleFactory-3 Cur singleFactory sender count = " + + singleFactory.getMsgSenderCount()); + + ProxyUtils.sleepSomeTime(3 * 60 * 1000L); + + // close all + multiFactory1.shutdownAll(); + multiFactory2.shutdownAll(); + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java new file mode 100644 index 00000000000..fce1404e0d4 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.example; + +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InLongTcpClientExample { + + protected static final Logger logger = LoggerFactory.getLogger(InLongTcpClientExample.class); + + public static void main(String[] args) throws Exception { + + String managerIp = args[0]; + String managerPort = args[1]; + String groupId = args[2]; + String streamId = args[3]; + String secretId = args[4]; + String secretKey = args[5]; + int reqCnt = Integer.parseInt(args[6]); + int msgSize = 1024; + int msgCnt = 1; + if (args.length > 7) { + msgSize = Integer.parseInt(args[7]); + msgCnt = Integer.parseInt(args[8]); + } + + String managerAddr = "http://" + managerIp + ":" + managerPort; + + TcpMsgSenderConfig dataProxyConfig = + new TcpMsgSenderConfig(managerAddr, groupId, secretId, secretKey); + dataProxyConfig.setRequestTimeoutMs(20000L); + InLongTcpMsgSender messageSender = new InLongTcpMsgSender(dataProxyConfig); + + logger.info("InLongTcpMsgSender start"); + + ProcessResult procResult = new ProcessResult(); + if (!messageSender.start(procResult)) { + System.out.println("Start sender failure: process result=" + procResult.toString()); + } + ExampleUtils.sendTcpMessages(messageSender, true, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendTcpMessages(messageSender, true, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendTcpMessages(messageSender, false, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendTcpMessages(messageSender, false, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ProxyUtils.sleepSomeTime(10000L); + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java index a5f44f17ee0..c103bd31f4f 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java @@ -17,11 +17,13 @@ package org.apache.inlong.sdk.dataproxy.sender; +import org.apache.inlong.sdk.dataproxy.MsgSenderFactory; import org.apache.inlong.sdk.dataproxy.common.ErrorCode; import org.apache.inlong.sdk.dataproxy.common.ProcessResult; import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.config.ConfigHolder; import org.apache.inlong.sdk.dataproxy.config.HostInfo; +import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; import org.apache.inlong.sdk.dataproxy.network.ClientMgr; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; @@ -58,6 +60,8 @@ public abstract class BaseSender implements ConfigHolder { private static final AtomicLong senderIdGen = new AtomicLong(0L); // protected final AtomicInteger senderStatus = new AtomicInteger(SENDER_STATUS_UNINITIALIZED); + protected final MsgSenderFactory senderFactory; + private final String factoryClusterIdKey; protected final String senderId; protected final ProxyClientConfig baseConfig; protected ClientMgr clientMgr; @@ -71,15 +75,20 @@ public abstract class BaseSender implements ConfigHolder { protected volatile int groupIdNum = 0; private Map streamIdMap = new HashMap<>(); - protected BaseSender(ProxyClientConfig configure) { + protected BaseSender(ProxyClientConfig configure, MsgSenderFactory senderFactory, String clusterIdKey) { + if (configure == null) { + throw new NullPointerException("configure is null"); + } this.baseConfig = configure.clone(); + this.senderFactory = senderFactory; + this.factoryClusterIdKey = clusterIdKey; this.senderId = configure.getDataRptProtocol() + "-" + senderIdGen.incrementAndGet(); } public boolean start(ProcessResult procResult) { if (!this.senderStatus.compareAndSet( SENDER_STATUS_UNINITIALIZED, SENDER_STATUS_INITIALIZING)) { - return procResult.setFailResult(ErrorCode.OK); + return procResult.setSuccess(); } // start client manager if (!this.clientMgr.start(procResult)) { @@ -103,7 +112,7 @@ public boolean start(ProcessResult procResult) { this.configManager.start(); this.senderStatus.set(SENDER_STATUS_STARTED); logger.info("Sender({}) instance started!", senderId); - return procResult.setFailResult(ErrorCode.OK); + return procResult.setSuccess(); } public void close() { @@ -116,6 +125,9 @@ public void close() { } configManager.shutDown(); clientMgr.stop(); + if (this.senderFactory != null) { + this.senderFactory.removeClient(this); + } logger.info("Sender({}) instance stopped!", senderId); } @@ -165,10 +177,22 @@ public boolean isStarted() { return senderStatus.get() == SENDER_STATUS_STARTED; } + public MsgSenderFactory getSenderFactory() { + return senderFactory; + } + + public String getFactoryClusterIdKey() { + return factoryClusterIdKey; + } + public String getMetaConfigKey() { return this.baseConfig.getGroupMetaConfigKey(); } + public ProxyConfigEntry getProxyConfigEntry() { + return configManager.getProxyConfigEntry(); + } + public String getSenderId() { return senderId; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java index 97076dd9777..aac6d97aee6 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java @@ -19,6 +19,7 @@ import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.MsgSenderFactory; import org.apache.inlong.sdk.dataproxy.common.ErrorCode; import org.apache.inlong.sdk.dataproxy.common.ProcessResult; import org.apache.inlong.sdk.dataproxy.common.SdkConsts; @@ -56,13 +57,54 @@ public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender { private final TcpMsgSenderConfig tcpConfig; private final TcpClientMgr tcpClientMgr; - protected InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) { - super(configure); + public InLongTcpMsgSender(TcpMsgSenderConfig configure) { + this(configure, null, null, null); + } + + public InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) { + this(configure, selfDefineFactory, null, null); + } + + public InLongTcpMsgSender(TcpMsgSenderConfig configure, + ThreadFactory selfDefineFactory, MsgSenderFactory senderFactory, String clusterIdKey) { + super(configure, senderFactory, clusterIdKey); this.tcpConfig = (TcpMsgSenderConfig) baseConfig; this.clientMgr = new TcpClientMgr(this.getSenderId(), this.tcpConfig, selfDefineFactory); this.tcpClientMgr = (TcpClientMgr) clientMgr; } + @Override + public boolean sendMessage(TcpEventInfo eventInfo, ProcessResult procResult) { + if (eventInfo == null) { + throw new NullPointerException("eventInfo is null"); + } + if (procResult == null) { + throw new NullPointerException("procResult is null"); + } + if (!this.isStarted()) { + return procResult.setFailResult(ErrorCode.SDK_CLOSED); + } + return processEvent(SendQos.SOURCE_ACK, eventInfo, null, procResult); + } + + @Override + public boolean asyncSendMessage( + TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult) { + if (eventInfo == null) { + throw new NullPointerException("eventInfo is null"); + } + if (callback == null) { + throw new NullPointerException("callback is null"); + } + if (procResult == null) { + throw new NullPointerException("procResult is null"); + } + if (!this.isStarted()) { + return procResult.setFailResult(ErrorCode.SDK_CLOSED); + } + return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, procResult); + } + @Override public boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult procResult) { if (eventInfo == null) { @@ -78,8 +120,7 @@ public boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult procR } @Override - public boolean syncSendMessage(boolean sendInB2B, - TcpEventInfo eventInfo, ProcessResult procResult) { + public boolean sendMsgWithSinkAck(TcpEventInfo eventInfo, ProcessResult procResult) { if (eventInfo == null) { throw new NullPointerException("eventInfo is null"); } @@ -89,15 +130,11 @@ public boolean syncSendMessage(boolean sendInB2B, if (!this.isStarted()) { return procResult.setFailResult(ErrorCode.SDK_CLOSED); } - if (sendInB2B) { - return processEvent(SendQos.SOURCE_ACK, eventInfo, null, procResult); - } else { - return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult); - } + return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult); } @Override - public boolean asyncSendMessage(boolean sendInB2B, + public boolean asyncSendMsgWithSinkAck( TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult) { if (eventInfo == null) { throw new NullPointerException("eventInfo is null"); @@ -111,11 +148,7 @@ public boolean asyncSendMessage(boolean sendInB2B, if (!this.isStarted()) { return procResult.setFailResult(ErrorCode.SDK_CLOSED); } - if (sendInB2B) { - return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, procResult); - } else { - return processEvent(SendQos.SINK_ACK, eventInfo, callback, procResult); - } + return processEvent(SendQos.SINK_ACK, eventInfo, callback, procResult); } @Override diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java index 97543d86a5e..d36117614ca 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java @@ -28,6 +28,47 @@ */ public interface TcpMsgSender extends MessageSender { + /** + * Synchronously send message and wait for the final sending result + * DataProxy returns response as soon as it receives the request and + * forwards the message in B2B mode until it succeeds; + * + *

Attention: + * 1. if return false, the caller can choose to wait for a period of time before trying again, or + * discard the event after multiple retries and failures. + * 2. this method tries to ensure that messages are delivered, but there + * may be duplicate messages or message loss scenarios. It is suitable for scenarios with + * a very large number of reports, very low reporting time requirements, and + * the need to return the sending results. + *

+ * + * @param eventInfo the event information need to send + * @param procResult The send result, including the detail error infos if failed + * @return true if successful, false if failed for some reason. + */ + boolean sendMessage(TcpEventInfo eventInfo, ProcessResult procResult); + + /** + * Asynchronously send message and return immediately + * DataProxy returns response as soon as it receives the request and + * forwards the message in B2B mode until it succeeds; + * + *

Attention: + * 1. if return false, the caller can choose to wait for a period of time before trying again, or + * discard the event after multiple retries and failures. + * 2. this method, tries to ensure that messages are delivered, but there + * may be duplicate messages or message loss scenarios. It is suitable for scenarios with + * a very large number of reports, very low reporting time requirements, and + * the need to return the sending results. + *

+ * @param eventInfo the event information need to send + * @param callback the callback that returns the response from DataProxy or + * an exception that occurred while waiting for the response. + * @param procResult The send result, including the detail error infos if the event not accepted + * @return true if successful, false if the event not accepted for some reason. + */ + boolean asyncSendMessage(TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult); + /** * Send message without response * @@ -47,55 +88,41 @@ public interface TcpMsgSender extends MessageSender { /** * Synchronously send message and wait for the final sending result + * DataProxy returns response after receiving the request and forwarding it successfully, + * and DataProxy does not retry on failure * *

Attention: * 1. if return false, the caller can choose to wait for a period of time before trying again, or * discard the event after multiple retries and failures. - * 2. this method, with sendInB2B = true, tries to ensure that messages are delivered, but there - * may be duplicate messages or message loss scenarios. It is suitable for scenarios with - * a very large number of reports, very low reporting time requirements, and - * the need to return the sending results. - * 3. this method, with sendInB2B = false, ensures that the message is delivered only once and + * 2. this method, ensures that the message is delivered only once and * will not be repeated. It is suitable for businesses with a small amount of reports and * no requirements on the reporting time, but require DataProxy to forward messages with high reliability. *

* - * @param sendInB2B indicates the DataProxy message service mode, true indicates DataProxy returns - * as soon as it receives the request and forwards the message in B2B mode until it succeeds; - * false indicates DataProxy returns after receiving the request and forwarding it successfully, - * and DataProxy does not retry on failure * @param eventInfo the event information need to send * @param procResult The send result, including the detail error infos if failed * @return true if successful, false if failed for some reason. */ - boolean syncSendMessage(boolean sendInB2B, - TcpEventInfo eventInfo, ProcessResult procResult); + boolean sendMsgWithSinkAck(TcpEventInfo eventInfo, ProcessResult procResult); /** - * Asynchronously send message + * Asynchronously send message and return immediately + * DataProxy returns response after receiving the request and forwarding it successfully, + * and DataProxy does not retry on failure * *

Attention: * 1. if return false, the caller can choose to wait for a period of time before trying again, or * discard the event after multiple retries and failures. - * 2. this method, with sendInB2B = true, tries to ensure that messages are delivered, but there - * may be duplicate messages or message loss scenarios. It is suitable for scenarios with - * a very large number of reports, very low reporting time requirements, and - * the need to return the sending results. - * 3. this method, with sendInB2B = false, ensures that the message is delivered only once and + * 3. this method, ensures that the message is delivered only once and * will not be repeated. It is suitable for businesses with a small amount of reports and * no requirements on the reporting time, but require DataProxy to forward messages with high reliability. *

* - * @param sendInB2B indicates the DataProxy message service mode, true indicates DataProxy returns - * as soon as it receives the request and forwards the message in B2B mode until it succeeds; - * false indicates DataProxy returns after receiving the request and forwarding it successfully, - * and DataProxy does not retry on failure * @param eventInfo the event information need to send * @param callback the callback that returns the response from DataProxy or * an exception that occurred while waiting for the response. * @param procResult The send result, including the detail error infos if the event not accepted * @return true if successful, false if the event not accepted for some reason. */ - boolean asyncSendMessage(boolean sendInB2B, - TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult); + boolean asyncSendMsgWithSinkAck(TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java index 2eace638a20..1e72c2bf9cf 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java @@ -19,7 +19,9 @@ import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.common.SdkConsts; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; import org.apache.commons.lang3.StringUtils; @@ -124,6 +126,20 @@ public static boolean sleepSomeTime(long sleepTimeMs) { } } + public static void validProxyConfigNotNull(ProxyClientConfig configure) throws ProxySdkException { + if (configure == null) { + throw new ProxySdkException("configure is null!"); + } + } + + public static String buildClusterIdKey(String protocol, String regionName, Integer clusterId) { + return clusterId + ":" + regionName + ":" + protocol; + } + + public static String buildGroupIdConfigKey(String protocol, String regionName, String groupId) { + return protocol + ":" + regionName + ":" + groupId; + } + public static boolean isAttrKeysValid(Map attrsMap) { if (attrsMap == null || attrsMap.size() == 0) { return false;