From 59211cca538a372742b465710e2b6b3eef8d9c2d Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Wed, 22 Jan 2025 16:28:48 +0800 Subject: [PATCH 1/2] [INLONG-11706][SDK] Optimize HTTP Sender implementation --- .../sdk/dataproxy/BaseMsgSenderFactory.java | 112 ++++- .../sdk/dataproxy/MsgSenderFactory.java | 20 + .../sdk/dataproxy/MsgSenderMultiFactory.java | 21 +- .../sdk/dataproxy/MsgSenderSingleFactory.java | 20 +- .../sdk/dataproxy/common/ErrorCode.java | 10 + .../sdk/dataproxy/common/SdkConsts.java | 4 + .../dataproxy/http/InternalHttpSender.java | 3 +- .../dataproxy/network/HttpProxySender.java | 2 + .../dataproxy/network/http/HttpAsyncObj.java | 51 ++ .../dataproxy/network/http/HttpClientMgr.java | 461 ++++++++++++++++++ .../network/http/HttpContentType.java | 27 + .../sdk/dataproxy/sender/BaseSender.java | 2 + .../sender/http/HttpMsgSenderConfig.java | 2 +- .../sender/http/InLongHttpMsgSender.java | 136 ++++++ .../inlong/sdk/dataproxy/utils/HttpUtils.java | 71 +++ 15 files changed, 925 insertions(+), 17 deletions(-) create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java index 0bc0cf4bb9..50fa183c58 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java @@ -23,6 +23,8 @@ import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; @@ -69,9 +71,9 @@ public void close() { senderCacheLock.writeLock().lock(); try { // release groupId mapped senders - totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap); + totalSenderCnt = releaseAllGroupIdSenders(groupIdSenderMap); // release clusterId mapped senders - totalSenderCnt += innReleaseAllClusterIdSenders(clusterIdSenderMap); + totalSenderCnt += releaseAllClusterIdSenders(clusterIdSenderMap); } finally { senderCacheLock.writeLock().unlock(); } @@ -90,9 +92,9 @@ public void removeClient(BaseSender msgSender) { senderCacheLock.writeLock().lock(); try { if (msgSender.getFactoryClusterIdKey() == null) { - removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap); + removed = removeGroupIdSender(msgSender, groupIdSenderMap); } else { - removed = innRemoveClusterIdSender(msgSender, clusterIdSenderMap); + removed = removeClusterIdSender(msgSender, clusterIdSenderMap); } } finally { senderCacheLock.writeLock().unlock(); @@ -108,7 +110,7 @@ public int getMsgSenderCount() { public InLongTcpMsgSender genTcpSenderByGroupId( TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { - ProxyUtils.validProxyConfigNotNull(configure); + validProxyConfigNotNull(configure); // query cached sender String metaConfigKey = configure.getGroupMetaConfigKey(); InLongTcpMsgSender messageSender = @@ -148,9 +150,51 @@ public InLongTcpMsgSender genTcpSenderByGroupId( } } + public InLongHttpMsgSender genHttpSenderByGroupId( + HttpMsgSenderConfig configure) throws ProxySdkException { + validProxyConfigNotNull(configure); + // query cached sender + String metaConfigKey = configure.getGroupMetaConfigKey(); + InLongHttpMsgSender messageSender = + (InLongHttpMsgSender) groupIdSenderMap.get(metaConfigKey); + if (messageSender != null) { + return messageSender; + } + // valid configure info + ProcessResult procResult = new ProcessResult(); + qryProxyMetaConfigure(configure, procResult); + // generate sender + senderCacheLock.writeLock().lock(); + try { + // re-get the created sender based on the groupId key after locked + messageSender = (InLongHttpMsgSender) groupIdSenderMap.get(metaConfigKey); + if (messageSender != null) { + return messageSender; + } + // build a new sender based on groupId + messageSender = new InLongHttpMsgSender(configure, msgSenderFactory, null); + if (!messageSender.start(procResult)) { + messageSender.close(); + throw new ProxySdkException("Failed to start groupId sender: " + procResult); + } + groupIdSenderMap.put(metaConfigKey, messageSender); + logger.info("MsgSenderFactory({}) generated a new groupId({}) sender({})", + this.factoryNo, metaConfigKey, messageSender.getSenderId()); + return messageSender; + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("MsgSenderFactory({}) build groupId sender({}) exception", + this.factoryNo, metaConfigKey, ex); + } + throw new ProxySdkException("Failed to build groupId sender: " + ex.getMessage()); + } finally { + senderCacheLock.writeLock().unlock(); + } + } + public InLongTcpMsgSender genTcpSenderByClusterId( TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException { - ProxyUtils.validProxyConfigNotNull(configure); + validProxyConfigNotNull(configure); // get groupId's clusterIdKey ProcessResult procResult = new ProcessResult(); ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);; @@ -191,6 +235,48 @@ public InLongTcpMsgSender genTcpSenderByClusterId( } } + public InLongHttpMsgSender genHttpSenderByClusterId( + HttpMsgSenderConfig configure) throws ProxySdkException { + validProxyConfigNotNull(configure); + // get groupId's clusterIdKey + ProcessResult procResult = new ProcessResult(); + ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);; + String clusterIdKey = ProxyUtils.buildClusterIdKey( + configure.getDataRptProtocol(), configure.getRegionName(), proxyConfigEntry.getClusterId()); + // get local built sender + InLongHttpMsgSender messageSender = (InLongHttpMsgSender) clusterIdSenderMap.get(clusterIdKey); + if (messageSender != null) { + return messageSender; + } + // generate sender + senderCacheLock.writeLock().lock(); + try { + // re-get the created sender based on the clusterId Key after locked + messageSender = (InLongHttpMsgSender) clusterIdSenderMap.get(clusterIdKey); + if (messageSender != null) { + return messageSender; + } + // build a new sender based on clusterId Key + messageSender = new InLongHttpMsgSender(configure, msgSenderFactory, clusterIdKey); + if (!messageSender.start(procResult)) { + messageSender.close(); + throw new ProxySdkException("Failed to start cluster sender: " + procResult); + } + clusterIdSenderMap.put(clusterIdKey, messageSender); + logger.info("MsgSenderFactory({}) generated a new clusterId({}) sender({})", + this.factoryNo, clusterIdKey, messageSender.getSenderId()); + return messageSender; + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("MsgSenderFactory({}) build cluster sender({}) exception", + this.factoryNo, clusterIdKey, ex); + } + throw new ProxySdkException("Failed to build cluster sender: " + ex.getMessage()); + } finally { + senderCacheLock.writeLock().unlock(); + } + } + private ProxyConfigEntry qryProxyMetaConfigure( ProxyClientConfig proxyConfig, ProcessResult procResult) throws ProxySdkException { ProxyConfigManager inlongMetaQryMgr = new ProxyConfigManager(proxyConfig); @@ -205,7 +291,7 @@ private ProxyConfigEntry qryProxyMetaConfigure( return inlongMetaQryMgr.getProxyConfigEntry(); } - private boolean innRemoveGroupIdSender(BaseSender msgSender, Map senderMap) { + private boolean removeGroupIdSender(BaseSender msgSender, Map senderMap) { BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey()); if (tmpSender == null || !tmpSender.getSenderId().equals(msgSender.getSenderId())) { @@ -214,7 +300,7 @@ private boolean innRemoveGroupIdSender(BaseSender msgSender, Map senderMap) { + private boolean removeClusterIdSender(BaseSender msgSender, Map senderMap) { BaseSender tmpSender = senderMap.get(msgSender.getFactoryClusterIdKey()); if (tmpSender == null || !tmpSender.getSenderId().equals(msgSender.getSenderId())) { @@ -223,7 +309,7 @@ private boolean innRemoveClusterIdSender(BaseSender msgSender, Map senderMap) { + private int releaseAllGroupIdSenders(Map senderMap) { int totalSenderCnt = 0; for (Map.Entry entry : senderMap.entrySet()) { if (entry == null || entry.getValue() == null) { @@ -243,7 +329,7 @@ private int innReleaseAllGroupIdSenders(Map senderMap) { return totalSenderCnt; } - private int innReleaseAllClusterIdSenders(Map senderMap) { + private int releaseAllClusterIdSenders(Map senderMap) { int totalSenderCnt = 0; for (Map.Entry entry : senderMap.entrySet()) { if (entry == null @@ -264,4 +350,10 @@ private int innReleaseAllClusterIdSenders(Map senderMap) { senderMap.clear(); return totalSenderCnt; } + + private void validProxyConfigNotNull(ProxyClientConfig configure) throws ProxySdkException { + if (configure == null) { + throw new ProxySdkException("configure is null!"); + } + } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java index a6a4e20b4c..d2169d7152 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java @@ -19,6 +19,8 @@ import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; @@ -88,4 +90,22 @@ InLongTcpMsgSender genTcpSenderByClusterId( */ InLongTcpMsgSender genTcpSenderByClusterId( TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException; + + /** + * Get or generate a http sender from the factory according to groupId + * + * @param configure the sender configure + * @return the sender + */ + InLongHttpMsgSender genHttpSenderByGroupId( + HttpMsgSenderConfig configure) throws ProxySdkException; + + /** + * Get or generate a http sender from the factory according to clusterId + * + * @param configure the sender configure + * @return the sender + */ + InLongHttpMsgSender genHttpSenderByClusterId( + HttpMsgSenderConfig configure) throws ProxySdkException; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java index 0cb595c261..f42d8b4d61 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java @@ -19,9 +19,10 @@ import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; -import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -79,7 +80,6 @@ public InLongTcpMsgSender genTcpSenderByGroupId( if (!this.initialized.get()) { throw new ProxySdkException("Please initialize the factory first!"); } - ProxyUtils.validProxyConfigNotNull(configure); return this.baseMsgSenderFactory.genTcpSenderByGroupId(configure, selfDefineFactory); } @@ -95,7 +95,22 @@ public InLongTcpMsgSender genTcpSenderByClusterId( if (!this.initialized.get()) { throw new ProxySdkException("Please initialize the factory first!"); } - ProxyUtils.validProxyConfigNotNull(configure); return this.baseMsgSenderFactory.genTcpSenderByClusterId(configure, selfDefineFactory); } + + @Override + public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig configure) throws ProxySdkException { + if (!this.initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + return this.baseMsgSenderFactory.genHttpSenderByGroupId(configure); + } + + @Override + public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig configure) throws ProxySdkException { + if (!this.initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + return this.baseMsgSenderFactory.genHttpSenderByClusterId(configure); + } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java index 91b1735155..ad891c971b 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java @@ -19,6 +19,8 @@ import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; @@ -86,7 +88,6 @@ public InLongTcpMsgSender genTcpSenderByGroupId( if (!initialized.get()) { throw new ProxySdkException("Please initialize the factory first!"); } - ProxyUtils.validProxyConfigNotNull(configure); return baseMsgSenderFactory.genTcpSenderByGroupId(configure, selfDefineFactory); } @@ -102,7 +103,22 @@ public InLongTcpMsgSender genTcpSenderByClusterId( if (!initialized.get()) { throw new ProxySdkException("Please initialize the factory first!"); } - ProxyUtils.validProxyConfigNotNull(configure); return baseMsgSenderFactory.genTcpSenderByClusterId(configure, selfDefineFactory); } + + @Override + public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig configure) throws ProxySdkException { + if (!initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + return baseMsgSenderFactory.genHttpSenderByGroupId(configure); + } + + @Override + public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig configure) throws ProxySdkException { + if (!initialized.get()) { + throw new ProxySdkException("Please initialize the factory first!"); + } + return baseMsgSenderFactory.genHttpSenderByClusterId(configure); + } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java index 187a1b56b9..b1dd6f26c3 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java @@ -93,6 +93,16 @@ public enum ErrorCode { DP_UNCONFIGURED_GROUPID_OR_STREAMID(155, "DataProxy return unconfigured groupId or streamId"), // DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"), + // + HTTP_ASYNC_POOL_FULL(171, "Http async pool full"), + HTTP_ASYNC_OFFER_FAIL(172, "Http async offer event fail"), + HTTP_ASYNC_OFFER_EXCEPTION(173, "Http async offer event exception"), + HTTP_BUILD_CLIENT_EXCEPTION(174, "Http build client exception"), + // + BUILD_FORM_CONTENT_EXCEPTION(181, "Build form content exception"), + DP_RETURN_FAILURE(182, "DataProxy return failure"), + HTTP_VISIT_DP_EXCEPTION(183, "Http visit exception"), + DP_RETURN_UNKNOWN_ERROR(184, "DataProxy return unknown error"), UNKNOWN_ERROR(9999, "Unknown error"); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java index cf2d006742..948088bc8a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java @@ -21,6 +21,8 @@ public class SdkConsts { public static String PREFIX_HTTP = "http://"; public static String PREFIX_HTTPS = "https://"; + public static final String KEY_HTTP_FIELD_BODY = "body"; + public static final String KEY_HTTP_FIELD_DELIMITER = "rcdDlmtr"; // dataproxy node config public static final String MANAGER_DATAPROXY_API = "/inlong/manager/openapi/dataproxy/getIpList/"; @@ -32,6 +34,8 @@ public class SdkConsts { public static final String BASIC_AUTH_HEADER = "authorization"; // default region name public static final String VAL_DEF_REGION_NAME = ""; + // http report method + public static final String DATAPROXY_REPORT_METHOD = "/dataproxy/message"; // config info sync interval in minutes public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3; public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java index 51452d528a..382ba58ad8 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java @@ -52,8 +52,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +@Deprecated /** - * internal http sender + * Replace by InLongHttpMsgSender */ public class InternalHttpSender { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java index 1b5653bc46..bd06d38b7f 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java @@ -37,8 +37,10 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +@Deprecated /** * http sender + * Replace by InLongHttpMsgSender */ public class HttpProxySender extends Thread { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java new file mode 100644 index 0000000000..e58644fb29 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.http; + +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo; + +/** + * HTTP Asynchronously Object class + * + * Used to carry the reported message content + */ +public class HttpAsyncObj { + + private final HttpEventInfo httpEvent; + private final MsgSendCallback callback; + private final long rptMs; + + public HttpAsyncObj(HttpEventInfo httpEvent, MsgSendCallback callback) { + this.httpEvent = httpEvent; + this.callback = callback; + this.rptMs = System.currentTimeMillis(); + } + + public HttpEventInfo getHttpEvent() { + return httpEvent; + } + + public MsgSendCallback getCallback() { + return callback; + } + + public long getRptMs() { + return rptMs; + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java new file mode 100644 index 0000000000..4e875359cf --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.http; + +import org.apache.inlong.common.enums.DataProxyErrCode; +import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.sdk.dataproxy.common.ErrorCode; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.common.SdkConsts; +import org.apache.inlong.sdk.dataproxy.config.HostInfo; +import org.apache.inlong.sdk.dataproxy.network.ClientMgr; +import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.HttpUtils; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.netty.handler.codec.http.HttpHeaderValues; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * HTTP Client Manager class + * + * Used to manage HTTP clients, including periodically selecting proxy nodes, + * finding available nodes when reporting messages, maintaining inflight message + * sending status, finding responses to corresponding requests, etc. + */ +public class HttpClientMgr implements ClientMgr { + + private static final Logger logger = LoggerFactory.getLogger(HttpClientMgr.class); + private static final LogCounter updConExptCnt = new LogCounter(10, 100000, 60 * 1000L); + private static final LogCounter sendMsgExptCnt = new LogCounter(10, 100000, 60 * 1000L); + private static final LogCounter asyncSendExptCnt = new LogCounter(10, 100000, 60 * 1000L); + + private final BaseSender sender; + private final HttpMsgSenderConfig httpConfig; + private CloseableHttpClient httpClient; + private final LinkedBlockingQueue messageCache; + private final Semaphore asyncIdleCellCnt; + private final ExecutorService workerServices = Executors.newCachedThreadPool(); + private final AtomicBoolean shutDown = new AtomicBoolean(false); + // meta info + private ConcurrentHashMap usingNodeMaps = new ConcurrentHashMap<>(); + private final ConcurrentHashMap connFailNodeMap = new ConcurrentHashMap<>(); + // current using nodes + private List activeNodes = new ArrayList<>(); + private volatile long lastUpdateTime = -1; + // node select index + private final AtomicInteger reqSendIndex = new AtomicInteger(0); + + public HttpClientMgr(BaseSender sender, HttpMsgSenderConfig httpConfig) { + this.sender = sender; + this.httpConfig = httpConfig; + this.messageCache = new LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize()); + this.asyncIdleCellCnt = new Semaphore(httpConfig.getHttpAsyncRptCacheSize(), true); + } + + @Override + public boolean start(ProcessResult procResult) { + // build http client + if (!HttpUtils.constructHttpClient(httpConfig.isRptDataByHttps(), + httpConfig.getHttpSocketTimeoutMs(), httpConfig.getHttpConTimeoutMs(), + httpConfig.getTlsVersion(), procResult)) { + return false; + } + this.httpClient = (CloseableHttpClient) procResult.getRetData(); + // build async report workers + for (int i = 0; i < httpConfig.getHttpAsyncRptWorkerNum(); i++) { + workerServices.execute(new HttpAsyncReportWorker(i)); + } + logger.info("ClientMgr({}) started!", this.sender.getSenderId()); + return procResult.setSuccess(); + } + + /** + * close resources + */ + @Override + public void stop() { + if (!this.shutDown.compareAndSet(false, true)) { + return; + } + int remainCnt = 0; + if (!messageCache.isEmpty()) { + long startTime = System.currentTimeMillis(); + while (!messageCache.isEmpty()) { + if (System.currentTimeMillis() - startTime >= httpConfig.getHttpCloseWaitPeriodMs()) { + break; + } + ProxyUtils.sleepSomeTime(100L); + } + remainCnt = messageCache.size(); + messageCache.clear(); + } + workerServices.shutdown(); + if (httpClient != null) { + try { + httpClient.close(); + } catch (Throwable ignore) { + // + } + } + logger.info("ClientMgr({}) stopped, remain ({}) messages discarded!", + this.sender.getSenderId(), remainCnt); + } + + @Override + public int getInflightMsgCnt() { + return this.messageCache.size(); + } + + @Override + public int getActiveNodeCnt() { + return activeNodes.size(); + } + + @Override + public void updateProxyInfoList(boolean nodeChanged, ConcurrentHashMap hostInfoMap) { + if (hostInfoMap.isEmpty() || this.shutDown.get()) { + return; + } + long curTime = System.currentTimeMillis(); + try { + // shuffle candidate nodes + List candidateNodes = new ArrayList<>(hostInfoMap.size()); + candidateNodes.addAll(hostInfoMap.values()); + Collections.sort(candidateNodes); + Collections.shuffle(candidateNodes); + int curTotalCnt = candidateNodes.size(); + int needActiveCnt = Math.min(httpConfig.getAliveConnections(), curTotalCnt); + // build next step nodes + Long lstFailTime; + int maxCycleCnt = 3; + this.connFailNodeMap.clear(); + List realHosts = new ArrayList<>(); + ConcurrentHashMap tmpNodeMaps = new ConcurrentHashMap<>(); + do { + int selectCnt = 0; + long selectTime = System.currentTimeMillis(); + for (HostInfo hostInfo : candidateNodes) { + if (realHosts.contains(hostInfo.getReferenceName())) { + continue; + } + lstFailTime = this.connFailNodeMap.get(hostInfo.getReferenceName()); + if (lstFailTime != null + && selectTime - lstFailTime <= httpConfig.getHttpNodeReuseWaitIfFailMs()) { + continue; + } + tmpNodeMaps.put(hostInfo.getReferenceName(), hostInfo); + realHosts.add(hostInfo.getReferenceName()); + if (lstFailTime != null) { + this.connFailNodeMap.remove(hostInfo.getReferenceName()); + } + if (++selectCnt >= needActiveCnt) { + break; + } + } + if (!realHosts.isEmpty()) { + break; + } + ProxyUtils.sleepSomeTime(1000L); + } while (--maxCycleCnt > 0); + // update active nodes + if (realHosts.isEmpty()) { + if (nodeChanged) { + logger.error("ClientMgr({}) changed nodes, but all nodes failure, nodes={}, failNodes={}!", + this.sender.getSenderId(), candidateNodes, connFailNodeMap); + } else { + logger.error("ClientMgr({}) re-choose nodes, but all nodes failure, nodes={}, failNodes={}!", + this.sender.getSenderId(), candidateNodes, connFailNodeMap); + } + } else { + this.lastUpdateTime = System.currentTimeMillis(); + this.usingNodeMaps = tmpNodeMaps; + this.activeNodes = realHosts; + if (nodeChanged) { + logger.info("ClientMgr({}) changed nodes, wast {}ms, nodeCnt=(r:{}-a:{}), actives={}, fail={}", + this.sender.getSenderId(), (System.currentTimeMillis() - curTime), + needActiveCnt, realHosts.size(), realHosts, connFailNodeMap.keySet()); + } else { + logger.info("ClientMgr({}) re-choose nodes, wast {}ms, nodeCnt=(r:{}-a:{}), actives={}, fail={}", + this.sender.getSenderId(), (System.currentTimeMillis() - curTime), + needActiveCnt, realHosts.size(), realHosts, connFailNodeMap.keySet()); + } + } + } catch (Throwable ex) { + if (updConExptCnt.shouldPrint()) { + logger.warn("ClientMgr({}) update nodes throw exception", + this.sender.getSenderId(), ex); + } + } + } + + public boolean asyncSendMessage(HttpAsyncObj asyncObj, ProcessResult procResult) { + if (this.shutDown.get()) { + return procResult.setFailResult(ErrorCode.SDK_CLOSED); + } + List curNodes = this.activeNodes; + if (curNodes.isEmpty()) { + return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET); + } + if (!this.asyncIdleCellCnt.tryAcquire()) { + return procResult.setFailResult(ErrorCode.HTTP_ASYNC_POOL_FULL); + } + boolean released = false; + try { + if (!this.messageCache.offer(asyncObj)) { + this.asyncIdleCellCnt.release(); + released = true; + return procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_FAIL); + } + return procResult.setSuccess(); + } catch (Throwable ex) { + if (!released) { + this.asyncIdleCellCnt.release(); + } + if (asyncSendExptCnt.shouldPrint()) { + logger.warn("ClientMgr({}) async offer event exception", this.sender.getSenderId(), ex); + } + return procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_EXCEPTION, ex.getMessage()); + } + } + + /** + * send message to remote nodes + */ + public boolean sendMessage(HttpEventInfo httpEvent, ProcessResult procResult) { + if (this.shutDown.get()) { + return procResult.setFailResult(ErrorCode.SDK_CLOSED); + } + List curNodes = this.activeNodes; + int curNodeSize = curNodes.size(); + if (curNodeSize == 0) { + return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET); + } + String curNode; + HostInfo hostInfo; + Long lstFailTime; + int nullNodeCnt = 0; + HostInfo back1thNode = null; + long nodeSelectTime = System.currentTimeMillis(); + int startPos = reqSendIndex.getAndIncrement(); + for (int index = 0; index < curNodeSize; index++) { + curNode = curNodes.get(Math.abs(startPos++) % curNodeSize); + hostInfo = usingNodeMaps.get(curNode); + if (hostInfo == null) { + nullNodeCnt++; + continue; + } + lstFailTime = connFailNodeMap.get(hostInfo.getReferenceName()); + if (lstFailTime != null) { + if (nodeSelectTime - lstFailTime <= httpConfig.getHttpNodeReuseWaitIfFailMs()) { + back1thNode = hostInfo; + continue; + } + connFailNodeMap.remove(hostInfo.getReferenceName(), lstFailTime); + } + return innSendMsgByHttp(httpEvent, hostInfo, procResult); + } + if (nullNodeCnt == curNodeSize) { + return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET); + } + if (back1thNode != null) { + return innSendMsgByHttp(httpEvent, back1thNode, procResult); + } + return procResult.setFailResult(ErrorCode.NO_VALID_REMOTE_NODE); + } + + /** + * send request to DataProxy over http + */ + private boolean innSendMsgByHttp(HttpEventInfo httpEvent, HostInfo hostInfo, ProcessResult procResult) { + String rmtRptUrl = (httpConfig.isRptDataByHttps() ? SdkConsts.PREFIX_HTTPS : SdkConsts.PREFIX_HTTP) + + hostInfo.getReferenceName() + + SdkConsts.DATAPROXY_REPORT_METHOD; + if (!buildFormUrlPost(rmtRptUrl, httpEvent, procResult)) { + return false; + } + HttpPost httpPost = (HttpPost) procResult.getRetData(); + CloseableHttpResponse response = null; + try { + response = httpClient.execute(httpPost); + String returnStr = EntityUtils.toString(response.getEntity()); + int returnCode = response.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != returnCode) { + if (sendMsgExptCnt.shouldPrint()) { + logger.warn("ClientMgr({}) report event failure, errCode={}, returnStr={}", + this.sender.getSenderId(), returnCode, returnStr); + } + if (response.getStatusLine().getStatusCode() >= 500) { + this.connFailNodeMap.put(hostInfo.getReferenceName(), System.currentTimeMillis()); + } + return procResult.setFailResult(ErrorCode.RMT_RETURN_FAILURE, + response.getStatusLine().getStatusCode() + ":" + returnStr); + } + if (StringUtils.isBlank(returnStr)) { + return procResult.setFailResult(ErrorCode.RMT_RETURN_BLANK_CONTENT); + } + if (logger.isDebugEnabled()) { + logger.debug("success to report event, url={}, result={}", + rmtRptUrl, returnStr); + } + JsonObject jsonResponse = JsonParser.parseString(returnStr).getAsJsonObject(); + JsonElement codeElement = jsonResponse.get("code"); + JsonElement msgElement = jsonResponse.get("msg"); + if (codeElement != null) { + int errCode = codeElement.getAsInt(); + if (errCode == DataProxyErrCode.SUCCESS.getErrCode()) { + return procResult.setSuccess(); + } else { + return procResult.setFailResult(ErrorCode.DP_RETURN_FAILURE, + errCode + ":" + (msgElement != null ? msgElement.getAsString() : "")); + } + } + return procResult.setFailResult(ErrorCode.DP_RETURN_UNKNOWN_ERROR, returnStr); + } catch (Throwable ex) { + if (sendMsgExptCnt.shouldPrint()) { + logger.warn("ClientMgr({}) report event exception, url={}", + this.sender.getSenderId(), rmtRptUrl, ex); + } + return procResult.setFailResult(ErrorCode.HTTP_VISIT_DP_EXCEPTION, ex.getMessage()); + } finally { + if (httpPost != null) { + httpPost.releaseConnection(); + } + if (response != null) { + try { + response.close(); + } catch (Throwable ex) { + if (sendMsgExptCnt.shouldPrint()) { + logger.warn("ClientMgr({}) close response exception, url={}", + this.sender.getSenderId(), rmtRptUrl, ex); + } + } + } + } + } + + private boolean buildFormUrlPost( + String rmtRptUrl, HttpEventInfo httpEvent, ProcessResult procResult) { + ArrayList contents = new ArrayList<>(); + try { + HttpPost httpPost = new HttpPost(rmtRptUrl); + httpPost.setHeader(HttpHeaders.CONNECTION, + HttpHeaderValues.CLOSE.toString()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, + HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString()); + contents.add(new BasicNameValuePair(AttributeConstants.GROUP_ID, + httpEvent.getGroupId())); + contents.add(new BasicNameValuePair(AttributeConstants.STREAM_ID, + httpEvent.getStreamId())); + contents.add(new BasicNameValuePair(AttributeConstants.DATA_TIME, + String.valueOf(httpEvent.getDtMs()))); + contents.add(new BasicNameValuePair(SdkConsts.KEY_HTTP_FIELD_BODY, + StringUtils.join(httpEvent.getBodyList(), httpConfig.getHttpEventsSeparator()))); + contents.add(new BasicNameValuePair(AttributeConstants.MESSAGE_COUNT, + String.valueOf(httpEvent.getMsgCnt()))); + if (!httpConfig.isSepEventByLF()) { + contents.add(new BasicNameValuePair(SdkConsts.KEY_HTTP_FIELD_DELIMITER, + httpConfig.getHttpEventsSeparator())); + } + String encodedContents = URLEncodedUtils.format(contents, StandardCharsets.UTF_8); + httpPost.setEntity(new StringEntity(encodedContents)); + if (logger.isDebugEnabled()) { + logger.debug("begin to post request to {}, encoded content is: {}", + rmtRptUrl, encodedContents); + } + return procResult.setSuccess(httpPost); + } catch (Throwable ex) { + if (sendMsgExptCnt.shouldPrint()) { + logger.warn("ClientMgr({}) build form-url content failure, content={}", + this.sender.getSenderId(), contents, ex); + } + return procResult.setFailResult(ErrorCode.BUILD_FORM_CONTENT_EXCEPTION, ex.getMessage()); + } + } + + /** + * check cache runner + */ + private class HttpAsyncReportWorker implements Runnable { + + private final String workerId; + + public HttpAsyncReportWorker(int workerId) { + this.workerId = sender.getSenderId() + "-" + workerId; + } + + @Override + public void run() { + long curTime = 0; + HttpAsyncObj asyncObj; + ProcessResult procResult = new ProcessResult(); + logger.info("HttpAsyncReportWorker({}) started", this.workerId); + // if not shutdown or queue is not empty + while (!shutDown.get() || !messageCache.isEmpty()) { + while (!messageCache.isEmpty()) { + asyncObj = messageCache.poll(); + if (asyncObj == null) { + continue; + } + try { + sendMessage(asyncObj.getHttpEvent(), procResult); + curTime = System.currentTimeMillis(); + asyncObj.getCallback().onMessageAck(procResult); + } catch (Throwable ex) { + if (asyncSendExptCnt.shouldPrint()) { + logger.error("HttpAsync({}) report event exception", workerId, ex); + } + } finally { + asyncIdleCellCnt.release(); + } + } + ProxyUtils.sleepSomeTime(httpConfig.getHttpAsyncWorkerIdleWaitMs()); + } + logger.info("HttpAsyncReportWorker({}) stopped", this.workerId); + } + } +} \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java new file mode 100644 index 0000000000..9020b6b7d6 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.http; + +/** + * HTTP Report Content Type enum + * + * This enumeration defines of HTTP reporting supported content types. + */ +public enum HttpContentType { + APPLICATION_X_WWW_FORM_URLENCODED +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java index c103bd31f4..3b6485eac3 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java @@ -83,6 +83,8 @@ protected BaseSender(ProxyClientConfig configure, MsgSenderFactory senderFactory this.senderFactory = senderFactory; this.factoryClusterIdKey = clusterIdKey; this.senderId = configure.getDataRptProtocol() + "-" + senderIdGen.incrementAndGet(); + this.configManager = new ProxyConfigManager(this.senderId, this.baseConfig, this); + this.configManager.setDaemon(true); } public boolean start(ProcessResult procResult) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java index 6c7e5ac8b0..d05f4c821e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java @@ -18,11 +18,11 @@ package org.apache.inlong.sdk.dataproxy.sender.http; import org.apache.inlong.common.msg.AttributeConstants; -import org.apache.inlong.sdk.dataproxy.common.HttpContentType; import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.common.ReportProtocol; import org.apache.inlong.sdk.dataproxy.common.SdkConsts; import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; +import org.apache.inlong.sdk.dataproxy.network.http.HttpContentType; import org.apache.commons.lang3.StringUtils; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java new file mode 100644 index 0000000000..e7d0b44a34 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.sender.http; + +import org.apache.inlong.sdk.dataproxy.MsgSenderFactory; +import org.apache.inlong.sdk.dataproxy.common.ErrorCode; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.common.SdkConsts; +import org.apache.inlong.sdk.dataproxy.network.http.HttpAsyncObj; +import org.apache.inlong.sdk.dataproxy.network.http.HttpClientMgr; +import org.apache.inlong.sdk.dataproxy.sender.BaseSender; +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; + +/** + * HTTP(s) Message Sender class + * + * Used to define the HTTP(s) sender common methods + */ +public class InLongHttpMsgSender extends BaseSender implements HttpMsgSender { + + protected static final LogCounter httpExceptCnt = new LogCounter(10, 100000, 60 * 1000L); + private final HttpClientMgr httpClientMgr; + private final HttpMsgSenderConfig httpConfig; + + public InLongHttpMsgSender(HttpMsgSenderConfig configure) { + this(configure, null, null); + } + + public InLongHttpMsgSender(HttpMsgSenderConfig configure, MsgSenderFactory senderFactory, String clusterIdKey) { + super(configure, senderFactory, clusterIdKey); + this.httpConfig = (HttpMsgSenderConfig) baseConfig; + this.clientMgr = new HttpClientMgr(this, this.httpConfig); + this.httpClientMgr = (HttpClientMgr) clientMgr; + } + + @Override + public boolean syncSendMessage(HttpEventInfo eventInfo, ProcessResult procResult) { + validParamsNotNull(eventInfo, procResult); + if (!this.isStarted()) { + return procResult.setFailResult(ErrorCode.SDK_CLOSED); + } + if (this.isMetaInfoUnReady()) { + return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS); + } + // check package length + if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(), procResult)) { + return false; + } + return httpClientMgr.sendMessage(eventInfo, procResult); + } + + @Override + public boolean asyncSendMessage(HttpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult) { + validParamsNotNull(eventInfo, callback, procResult); + if (!this.isStarted()) { + return procResult.setFailResult(ErrorCode.SDK_CLOSED); + } + if (this.isMetaInfoUnReady()) { + return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS); + } + // check package length + if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(), procResult)) { + return false; + } + return httpClientMgr.asyncSendMessage(new HttpAsyncObj(eventInfo, callback), procResult); + } + + @Override + public int getActiveNodeCnt() { + return httpClientMgr.getActiveNodeCnt(); + } + + @Override + public int getInflightMsgCnt() { + return httpClientMgr.getInflightMsgCnt(); + } + + private boolean isValidPkgLength(HttpEventInfo eventInfo, int allowedLen, ProcessResult procResult) { + // Not valid if the maximum limit is less than or equal to 0 + if (allowedLen < 0) { + return true; + } + int eventLen = eventInfo.getBodySize() + + eventInfo.getGroupId().length() + + eventInfo.getStreamId().length() + + String.valueOf(eventInfo.getDtMs()).length(); + // Reserve space for attribute + if (eventLen > allowedLen - SdkConsts.RESERVED_ATTRIBUTE_LENGTH) { + String errMsg = String.format("OverMaxLen: content length(%d) > allowedLen(%d) - fixedLen(%d)", + eventLen, allowedLen, SdkConsts.RESERVED_ATTRIBUTE_LENGTH); + if (httpExceptCnt.shouldPrint()) { + logger.warn(errMsg); + } + return procResult.setFailResult(ErrorCode.REPORT_INFO_EXCEED_MAX_LEN, errMsg); + } + return true; + } + + private void validParamsNotNull(HttpEventInfo eventInfo, ProcessResult procResult) { + if (eventInfo == null) { + throw new NullPointerException("eventInfo is null"); + } + if (procResult == null) { + throw new NullPointerException("procResult is null"); + } + } + + private void validParamsNotNull(HttpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult) { + if (eventInfo == null) { + throw new NullPointerException("eventInfo is null"); + } + if (callback == null) { + throw new NullPointerException("callback is null"); + } + if (procResult == null) { + throw new NullPointerException("procResult is null"); + } + } + +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java new file mode 100644 index 0000000000..228ee59a24 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.utils; + +import org.apache.inlong.sdk.dataproxy.common.ErrorCode; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; + +import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContexts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; + +/** + * Http(s) Utils class + * + * Used to place public processing functions related to HTTP(s) + */ +public class HttpUtils { + + private static final Logger logger = LoggerFactory.getLogger(HttpUtils.class); + private static final LogCounter exceptCnt = new LogCounter(10, 200000, 60 * 1000L); + + public static boolean constructHttpClient(boolean rptByHttps, + int socketTimeoutMs, int conTimeoutMs, String tlsVer, ProcessResult procResult) { + CloseableHttpClient httpClient; + RequestConfig requestConfig = RequestConfig.custom() + .setSocketTimeout(socketTimeoutMs) + .setConnectTimeout(conTimeoutMs).build(); + try { + if (rptByHttps) { + SSLContext sslContext = SSLContexts.custom().build(); + SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext, + new String[]{tlsVer}, null, + SSLConnectionSocketFactory.getDefaultHostnameVerifier()); + httpClient = HttpClients.custom() + .setDefaultRequestConfig(requestConfig) + .setSSLSocketFactory(sslSf).build(); + } else { + httpClient = HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig).build(); + } + return procResult.setSuccess(httpClient); + } catch (Throwable ex) { + if (exceptCnt.shouldPrint()) { + logger.error("Build http client exception", ex); + } + return procResult.setFailResult(ErrorCode.HTTP_BUILD_CLIENT_EXCEPTION, ex.getMessage()); + } + } +} From b6782c9a3695e10761a89c522287aedb3ee3f3f9 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Wed, 22 Jan 2025 16:35:44 +0800 Subject: [PATCH 2/2] [INLONG-11706][SDK] Optimize HTTP Sender implementation --- .../sdk/dataproxy/common/HttpContentType.java | 27 ------- .../example/InLongFactoryExample.java | 38 ++++++++++ .../example/InLongHttpClientExample.java | 73 +++++++++++++++++++ 3 files changed, 111 insertions(+), 27 deletions(-) delete mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java deleted file mode 100644 index 0db9261c70..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.common; - -/** - * HTTP Report Content Type enum - * - * This enumeration defines of HTTP reporting supported content types. - */ -public enum HttpContentType { - APPLICATION_X_WWW_FORM_URLENCODED -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java index d1c8b8e19c..c9e438df3c 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java @@ -21,6 +21,8 @@ import org.apache.inlong.sdk.dataproxy.MsgSenderMultiFactory; import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory; import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; @@ -74,6 +76,22 @@ public static void main(String[] args) throws Exception { ProxyUtils.sleepSomeTime(10000L); tcpMsgSender.close(); + // report data by http + HttpMsgSenderConfig httpMsgSenderConfig = new HttpMsgSenderConfig( + false, managerIp, managerPort, groupId, secretId, secretKey); + InLongHttpMsgSender httpMsgSender = + singleFactory.genHttpSenderByGroupId(httpMsgSenderConfig); + if (!httpMsgSender.start(procResult)) { + System.out.println("Start http sender failure: process result=" + procResult); + } + ExampleUtils.sendHttpMessages(httpMsgSender, false, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendHttpMessages(httpMsgSender, false, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ProxyUtils.sleepSomeTime(10000L); + httpMsgSender.close(); + System.out.println("Cur singleton factory sender count is " + singleFactory.getMsgSenderCount()); + // report data use multi-factory MsgSenderMultiFactory multiFactory1 = new MsgSenderMultiFactory(); MsgSenderMultiFactory multiFactory2 = new MsgSenderMultiFactory(); @@ -104,6 +122,26 @@ public static void main(String[] args) throws Exception { System.out.println("Multi-1.2 Cur multiFactory1 sender count = " + multiFactory1.getMsgSenderCount() + ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount()); + // report data by http + InLongHttpMsgSender httpMsgSender1 = + multiFactory1.genHttpSenderByGroupId(httpMsgSenderConfig); + HttpMsgSenderConfig httpConfg2 = new HttpMsgSenderConfig(false, + managerIp, managerPort, groupId, secretId, secretKey); + InLongHttpMsgSender httpMsgSender2 = + multiFactory2.genHttpSenderByGroupId(httpConfg2); + ExampleUtils.sendHttpMessages(httpMsgSender1, false, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendHttpMessages(httpMsgSender2, false, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ProxyUtils.sleepSomeTime(10000L); + httpMsgSender1.close(); + System.out.println("Multi-2.1 Cur multiFactory1 sender count = " + + multiFactory1.getMsgSenderCount() + + ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount()); + httpMsgSender2.close(); + System.out.println("Multi-2.2 Cur multiFactory1 sender count = " + + multiFactory1.getMsgSenderCount() + + ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount()); // test self DefineFactory ThreadFactory selfDefineFactory = new DefaultThreadFactory("test_self_thread_factory"); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java new file mode 100644 index 0000000000..fccdac4a5c --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.example; + +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InLongHttpClientExample { + + protected static final Logger logger = LoggerFactory.getLogger(InLongHttpClientExample.class); + + public static void main(String[] args) throws Exception { + + String managerIp = args[0]; + String managerPort = args[1]; + String groupId = args[2]; + String streamId = args[3]; + String secretId = args[4]; + String secretKey = args[5]; + int reqCnt = Integer.parseInt(args[6]); + int msgSize = 1024; + int msgCnt = 1; + if (args.length > 7) { + msgSize = Integer.parseInt(args[7]); + msgCnt = Integer.parseInt(args[8]); + } + + String managerAddr = "http://" + managerIp + ":" + managerPort; + + HttpMsgSenderConfig dataProxyConfig = + new HttpMsgSenderConfig(managerAddr, groupId, secretId, secretKey); + InLongHttpMsgSender messageSender = new InLongHttpMsgSender(dataProxyConfig); + + ProcessResult procResult = new ProcessResult(); + if (!messageSender.start(procResult)) { + System.out.println("Start http sender failure: process result=" + procResult); + } + + System.out.println("InLongHttpMsgSender start, nodes=" + + messageSender.getProxyNodeInfos()); + + ExampleUtils.sendHttpMessages(messageSender, true, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendHttpMessages(messageSender, true, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendHttpMessages(messageSender, false, false, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + ExampleUtils.sendHttpMessages(messageSender, false, true, + groupId, streamId, reqCnt, msgSize, msgCnt, procResult); + + ProxyUtils.sleepSomeTime(10000L); + } +}