Skip to content

Commit

Permalink
[INLONG-11702][SDK] Optimize Sender factory implementation (#11705)
Browse files Browse the repository at this point in the history
Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Jan 22, 2025
1 parent 16084cc commit 9493c04
Show file tree
Hide file tree
Showing 16 changed files with 1,182 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Base Message Sender Factory
*
* Used to manage the instance relationship of the sender factory.
* Since both singleton and multiple instances involve the same relationship,
* they are abstracted and maintained separately here.
*/
public class BaseMsgSenderFactory {

private static final Logger logger = LoggerFactory.getLogger(BaseMsgSenderFactory.class);
private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L);
// msg send factory
private final MsgSenderFactory msgSenderFactory;
private final String factoryNo;
// for senders
private final ReentrantReadWriteLock senderCacheLock = new ReentrantReadWriteLock();
// for inlong groupId -- Sender map
private final ConcurrentHashMap<String, BaseSender> groupIdSenderMap = new ConcurrentHashMap<>();
// for inlong clusterId -- Sender map
private final ConcurrentHashMap<String, BaseSender> clusterIdSenderMap = new ConcurrentHashMap<>();

public BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String factoryNo) {
this.msgSenderFactory = msgSenderFactory;
this.factoryNo = factoryNo;
logger.info("MsgSenderFactory({}) started", this.factoryNo);
}

public void close() {
int totalSenderCnt;
int totalTDBankCnt;
senderCacheLock.writeLock().lock();
try {
// release groupId mapped senders
totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap);
// release clusterId mapped senders
totalSenderCnt += innReleaseAllClusterIdSenders(clusterIdSenderMap);
} finally {
senderCacheLock.writeLock().unlock();
}
logger.info("MsgSenderFactory({}) closed, release {} inlong senders",
this.factoryNo, totalSenderCnt);
}

public void removeClient(BaseSender msgSender) {
if (msgSender == null
|| msgSender.getSenderFactory() == null
|| msgSender.getSenderFactory() != msgSenderFactory) {
return;
}
boolean removed;
String senderId = msgSender.getSenderId();
senderCacheLock.writeLock().lock();
try {
if (msgSender.getFactoryClusterIdKey() == null) {
removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap);
} else {
removed = innRemoveClusterIdSender(msgSender, clusterIdSenderMap);
}
} finally {
senderCacheLock.writeLock().unlock();
}
if (removed) {
logger.info("MsgSenderFactory({}) removed sender({})", this.factoryNo, senderId);
}
}

public int getMsgSenderCount() {
return groupIdSenderMap.size() + clusterIdSenderMap.size();
}

public InLongTcpMsgSender genTcpSenderByGroupId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException {
ProxyUtils.validProxyConfigNotNull(configure);
// query cached sender
String metaConfigKey = configure.getGroupMetaConfigKey();
InLongTcpMsgSender messageSender =
(InLongTcpMsgSender) groupIdSenderMap.get(metaConfigKey);
if (messageSender != null) {
return messageSender;
}
// valid configure info
ProcessResult procResult = new ProcessResult();
qryProxyMetaConfigure(configure, procResult);
// generate sender
senderCacheLock.writeLock().lock();
try {
// re-get the created sender based on the groupId key after locked
messageSender = (InLongTcpMsgSender) groupIdSenderMap.get(metaConfigKey);
if (messageSender != null) {
return messageSender;
}
// build a new sender based on groupId
messageSender = new InLongTcpMsgSender(configure, selfDefineFactory, msgSenderFactory, null);
if (!messageSender.start(procResult)) {
messageSender.close();
throw new ProxySdkException("Failed to start groupId sender: " + procResult);
}
groupIdSenderMap.put(metaConfigKey, messageSender);
logger.info("MsgSenderFactory({}) generated a new groupId({}) sender({})",
this.factoryNo, metaConfigKey, messageSender.getSenderId());
return messageSender;
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("MsgSenderFactory({}) build groupId sender({}) exception",
this.factoryNo, metaConfigKey, ex);
}
throw new ProxySdkException("Failed to build groupId sender: " + ex.getMessage());
} finally {
senderCacheLock.writeLock().unlock();
}
}

public InLongTcpMsgSender genTcpSenderByClusterId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException {
ProxyUtils.validProxyConfigNotNull(configure);
// get groupId's clusterIdKey
ProcessResult procResult = new ProcessResult();
ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);;
String clusterIdKey = ProxyUtils.buildClusterIdKey(
configure.getDataRptProtocol(), configure.getRegionName(), proxyConfigEntry.getClusterId());
// get local built sender
InLongTcpMsgSender messageSender = (InLongTcpMsgSender) clusterIdSenderMap.get(clusterIdKey);
if (messageSender != null) {
return messageSender;
}
// generate sender
senderCacheLock.writeLock().lock();
try {
// re-get the created sender based on the clusterId Key after locked
messageSender = (InLongTcpMsgSender) clusterIdSenderMap.get(clusterIdKey);
if (messageSender != null) {
return messageSender;
}
// build a new sender based on clusterId Key
messageSender = new InLongTcpMsgSender(configure,
selfDefineFactory, msgSenderFactory, clusterIdKey);
if (!messageSender.start(procResult)) {
messageSender.close();
throw new ProxySdkException("Failed to start cluster sender: " + procResult);
}
clusterIdSenderMap.put(clusterIdKey, messageSender);
logger.info("MsgSenderFactory({}) generated a new clusterId({}) sender({})",
this.factoryNo, clusterIdKey, messageSender.getSenderId());
return messageSender;
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("MsgSenderFactory({}) build cluster sender({}) exception",
this.factoryNo, clusterIdKey, ex);
}
throw new ProxySdkException("Failed to build cluster sender: " + ex.getMessage());
} finally {
senderCacheLock.writeLock().unlock();
}
}

private ProxyConfigEntry qryProxyMetaConfigure(
ProxyClientConfig proxyConfig, ProcessResult procResult) throws ProxySdkException {
ProxyConfigManager inlongMetaQryMgr = new ProxyConfigManager(proxyConfig);
// check whether valid configure
if (!inlongMetaQryMgr.getGroupIdConfigure(true, procResult)) {
throw new ProxySdkException("Failed to query remote group config: " + procResult);
}
if (proxyConfig.isEnableReportEncrypt()
&& !inlongMetaQryMgr.getEncryptConfigure(true, procResult)) {
throw new ProxySdkException("Failed to query remote encrypt config: " + procResult);
}
return inlongMetaQryMgr.getProxyConfigEntry();
}

private boolean innRemoveGroupIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey());
if (tmpSender == null
|| !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
return false;
}
return senderMap.remove(msgSender.getMetaConfigKey()) != null;
}

private boolean innRemoveClusterIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
BaseSender tmpSender = senderMap.get(msgSender.getFactoryClusterIdKey());
if (tmpSender == null
|| !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
return false;
}
return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null;
}

private int innReleaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
int totalSenderCnt = 0;
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
if (entry == null || entry.getValue() == null) {
continue;
}
try {
entry.getValue().close();
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("MsgSenderFactory({}) close groupId({})'s sender failure",
this.factoryNo, entry.getKey(), ex);
}
}
totalSenderCnt++;
}
senderMap.clear();
return totalSenderCnt;
}

private int innReleaseAllClusterIdSenders(Map<String, BaseSender> senderMap) {
int totalSenderCnt = 0;
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
if (entry == null
|| entry.getKey() == null
|| entry.getValue() == null) {
continue;
}
try {
entry.getValue().close();
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("MsgSenderFactory({}) close clusterId({})'s sender failure",
this.factoryNo, entry.getKey(), ex);
}
}
totalSenderCnt++;
}
senderMap.clear();
return totalSenderCnt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

@Deprecated
/**
* Replace by InLongTcpMsgSender
*/
public class DefaultMessageSender implements MessageSender {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageSender.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

@Deprecated
public interface MessageSender {

void close();
Expand Down

This file was deleted.

Loading

0 comments on commit 9493c04

Please sign in to comment.