Skip to content

Commit

Permalink
[INLONG-11692][SDK] The metadata update function abstracted to Config…
Browse files Browse the repository at this point in the history
…Holder (#11693)

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Jan 21, 2025
1 parent 21cbf72 commit b10872e
Show file tree
Hide file tree
Showing 23 changed files with 86 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.config;

import java.util.List;

/**
* Configure Holder:
*
* Used to hold DataProxy meta configures
*/
public interface ConfigHolder {

void updateAllowedMaxPkgLength(int maxPkgLength);

void updateProxyNodes(boolean nodeChanged, List<HostInfo> newProxyNodes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
Expand Down Expand Up @@ -101,7 +100,7 @@ public class ProxyConfigManager extends Thread {

private final String callerId;
private final Gson gson = new Gson();
private final ClientMgr clientManager;
private final ConfigHolder configHolder;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private final AtomicBoolean shutDown = new AtomicBoolean(false);
// proxy configure info
Expand All @@ -125,13 +124,13 @@ public ProxyConfigManager(ProxyClientConfig configure) {
this("MetaQuery", configure, null);
}

public ProxyConfigManager(String callerId, ProxyClientConfig configure, ClientMgr clientManager) {
public ProxyConfigManager(String callerId, ProxyClientConfig configure, ConfigHolder configHolder) {
this.callerId = callerId;
this.clientManager = clientManager;
this.configHolder = configHolder;
if (configure != null) {
this.storeAndBuildMetaConfigure(configure);
}
if (this.clientManager != null) {
if (this.configHolder != null) {
this.setName("ConfigManager-" + this.callerId);
logger.info("ConfigManager({}) started, groupId={}",
this.callerId, mgrConfig.getInlongGroupId());
Expand All @@ -148,15 +147,15 @@ public boolean updProxyClientConfig(ProxyClientConfig configure, ProcessResult p
if (this.shutDown.get()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
if (this.clientManager != null) {
if (this.configHolder != null) {
return procResult.setFailResult(ErrorCode.ILLEGAL_CALL_STATE);
}
this.storeAndBuildMetaConfigure(configure);
return procResult.setSuccess();
}

public void shutDown() {
if (clientManager == null) {
if (this.configHolder == null) {
return;
}
if (shutDown.compareAndSet(false, true)) {
Expand Down Expand Up @@ -479,6 +478,7 @@ private void compareAndUpdateProxyList(ProxyConfigEntry proxyEntry) {
newProxyNodeList.addAll(proxyInfoList);
} else {
this.proxyConfigEntry = proxyEntry;
configHolder.updateAllowedMaxPkgLength(proxyEntry.getMaxPacketLength());
newSwitchStat = proxyEntry.getSwitchStat();
newProxyNodeList = new ArrayList<>(proxyEntry.getSize());
for (Map.Entry<String, HostInfo> entry : proxyEntry.getHostMap().entrySet()) {
Expand All @@ -491,7 +491,7 @@ private void compareAndUpdateProxyList(ProxyConfigEntry proxyEntry) {
if (nodeChanged || newSwitchStat != oldStat
|| (System.currentTimeMillis() - lstUpdateTime) >= mgrConfig.getForceReChooseInrMs()) {
proxyInfoList = newProxyNodeList;
clientManager.updateProxyInfoList(nodeChanged, proxyInfoList);
configHolder.updateProxyNodes(nodeChanged, proxyInfoList);
lstUpdateTime = System.currentTimeMillis();
oldStat = newSwitchStat;
}
Expand Down Expand Up @@ -813,6 +813,12 @@ private CloseableHttpClient getCloseableHttpClient(List<BasicNameValuePair> para

private void storeAndBuildMetaConfigure(ProxyClientConfig config) {
this.mgrConfig = config;
this.proxyConfigEntry = null;
this.proxyInfoList.clear();
this.oldStat = 0;
this.localMd5 = null;
this.lstUpdateTime = 0;
this.userEncryptConfigEntry = null;
StringBuilder strBuff = new StringBuilder(512);
this.proxyConfigVisitUrl = strBuff
.append(mgrConfig.isVisitMgrByHttps() ? SdkConsts.PREFIX_HTTPS : SdkConsts.PREFIX_HTTP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.example;

import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.network.HttpProxySender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.example;

import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class ClientHandler extends SimpleChannelInboundHandler<EncodeObject> {
private static final LogCounter thrownCnt = new LogCounter(10, 100000, 60 * 1000L);

private final Sender sender;
private final ClientMgr clientMgr;
private final DefClientMgr clientMgr;

public ClientHandler(Sender sender, ClientMgr clientMgr) {
public ClientHandler(Sender sender, DefClientMgr clientMgr) {
this.sender = sender;
this.clientMgr = clientMgr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {

private final ClientMgr clientMgr;
private final DefClientMgr clientMgr;
private final Sender sender;

public ClientPipelineFactory(ClientMgr clientMgr, Sender sender) {
public ClientPipelineFactory(DefClientMgr clientMgr, Sender sender) {
this.clientMgr = clientMgr;
this.sender = sender;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.inlong.sdk.dataproxy.network;

import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
Expand All @@ -51,9 +52,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ClientMgr {
public class DefClientMgr implements ConfigHolder {

private static final Logger logger = LoggerFactory.getLogger(ClientMgr.class);
private static final Logger logger = LoggerFactory.getLogger(DefClientMgr.class);
private static final LogCounter logCounter = new LogCounter(10, 100000, 60 * 1000L);
private static final LogCounter updConExptCnt = new LogCounter(10, 100000, 60 * 1000L);
private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L);
Expand Down Expand Up @@ -83,14 +84,14 @@ public class ClientMgr {
/**
* Build up the connection between the server and client.
*/
public ClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender) {
public DefClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender) {
this(tcpConfig, sender, null);
}

/**
* Build up the connection between the server and client.
*/
public ClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender, ThreadFactory selfDefineFactory) {
public DefClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender, ThreadFactory selfDefineFactory) {
this.tcpConfig = tcpConfig;
this.sender = sender;
// Initialize the bootstrap
Expand Down Expand Up @@ -301,7 +302,13 @@ public void setConnectionBusy(Channel channel) {
}
}

public void updateProxyInfoList(boolean nodeChanged, List<HostInfo> newNodes) {
@Override
public void updateAllowedMaxPkgLength(int maxPkgLength) {
//
}

@Override
public void updateProxyNodes(boolean nodeChanged, List<HostInfo> newNodes) {
if (newNodes == null || newNodes.isEmpty() || !this.started.get()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.inlong.sdk.dataproxy.network;

import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;

import io.netty.bootstrap.Bootstrap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.inlong.sdk.dataproxy.network;

import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
Expand Down Expand Up @@ -64,7 +64,7 @@ public class Sender {
private final AtomicInteger currentBufferSize = new AtomicInteger(0);
private final TimeoutScanThread scanThread;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ClientMgr clientMgr;
private final DefClientMgr clientMgr;
private final String instanceId;
private final TcpMsgSenderConfig tcpConfig;
private MetricWorkerThread metricWorker = null;
Expand All @@ -82,7 +82,7 @@ public Sender(TcpMsgSenderConfig tcpConfig, ThreadFactory selfDefineFactory) thr
this.instanceId = "sender-" + senderIdGen.incrementAndGet();
this.asyncCallbackMaxSize = tcpConfig.getTotalAsyncCallbackSize();
this.threadPool = Executors.newCachedThreadPool();
this.clientMgr = new ClientMgr(tcpConfig, this, selfDefineFactory);
this.clientMgr = new DefClientMgr(tcpConfig, this, selfDefineFactory);
this.scanThread = new TimeoutScanThread(this, tcpConfig);
if (tcpConfig.isEnableMetric()) {
metricWorker = new MetricWorkerThread(tcpConfig, this);
Expand Down Expand Up @@ -553,7 +553,7 @@ public ConcurrentHashMap<Channel, ConcurrentHashMap<String, QueueObject>> getCal
return callbacks;
}

public ClientMgr getClientMgr() {
public DefClientMgr getClientMgr() {
return clientMgr;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.http;
package org.apache.inlong.sdk.dataproxy.sender.http;

import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.common.HttpContentType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Map;

/**
* HTTP Event Information class
* TCP Event Information class
*
* Used to encapsulate the data information reported by TCP
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy;
package org.apache.inlong.sdk.dataproxy.sender.tcp;

import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.inlong.sdk.dataproxy.threads;

import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
Expand All @@ -26,6 +25,7 @@
import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.inlong.sdk.dataproxy.threads;

import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.QueueObject;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.TimeScanObject;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;

import io.netty.channel.Channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.inlong.sdk.dataproxy;

import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;

import org.junit.Assert;
import org.junit.Test;
Expand Down
Loading

0 comments on commit b10872e

Please sign in to comment.