Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4099]Optimized the performance of sending traceMessage in AsyncTraceDispatcher #4180

Merged
merged 4 commits into from
Apr 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -30,7 +29,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
Expand Down Expand Up @@ -59,22 +57,25 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
private final long pollingTimeMil;
private final long waitTimeThresholdMil;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private final ArrayBlockingQueue<TraceContext> traceContextQueue;
private final HashMap<String, TraceDataSegment> taskQueueByTopic;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private volatile String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;

Expand All @@ -83,8 +84,11 @@ public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCH
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.pollingTimeMil = 100;
this.waitTimeThresholdMil = 500;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.taskQueueByTopic = new HashMap();
this.group = group;
this.type = type;

Expand Down Expand Up @@ -243,126 +247,147 @@ class AsyncRunnable implements Runnable {
@Override
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
synchronized (traceContextQueue) {
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
long endTime = System.currentTimeMillis() + pollingTimeMil;
while (System.currentTimeMillis() < endTime) {
try {
//get trace data element from blocking Queue - traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
TraceContext traceContext = traceContextQueue.poll(
endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
);

if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
// get the topic which the trace message will send to
String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());

// get the traceDataSegment which will save this trace message, create if null
TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
if (traceDataSegment == null) {
traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId());
taskQueueByTopic.put(traceTopicName, traceDataSegment);
}

// encode traceContext and save it into traceDataSegment
// NOTE if data size in traceDataSegment more than maxMsgSize,
// a AsyncDataSendTask will be created and submitted
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
traceDataSegment.addTraceTransferBean(traceTransferBean);
}
} catch (InterruptedException ignore) {
log.debug("traceContextQueue#poll exception");
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {

// NOTE send the data in traceDataSegment which the first TraceTransferBean
// is longer than waitTimeThreshold
sendDataByTimeThreshold();

if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}

}
}

class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;
private void sendDataByTimeThreshold() {
long now = System.currentTimeMillis();
for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
taskInfo.sendAllData();
}
}
}

public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
private String getTraceTopicName(String regionId) {
AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel();
if (AccessChannel.CLOUD == accessChannel) {
return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}

return AsyncTraceDispatcher.this.getTraceTopicName();
}
}

@Override
public void run() {
sendTraceData(contextList);
class TraceDataSegment {
private long firstBeanAddTime;
private int currentMsgSize;
private final String traceTopicName;
private final String regionId;
private final List<TraceTransferBean> traceTransferBeanList = new ArrayList();

TraceDataSegment(String traceTopicName, String regionId) {
this.traceTopicName = traceTopicName;
this.regionId = regionId;
}

public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
// Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();
String regionId = context.getRegionId();
// Use original message entity's topic as key
String key = topic;
if (!StringUtils.isBlank(regionId)) {
key = key + TraceConstants.CONTENT_SPLITOR + regionId;
}
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
String dataTopic = entry.getKey();
String regionId = null;
if (key.length > 1) {
dataTopic = key[0];
regionId = key[1];
}
flushData(entry.getValue(), dataTopic, regionId);
public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
initFirstBeanAddTime();
this.traceTransferBeanList.add(traceTransferBean);
this.currentMsgSize += traceTransferBean.getTransData().length();
if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);

this.clear();
}
}

/**
* Batch sending data actually
*/
private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
if (transBeanList.size() == 0) {
public void sendAllData() {
if (this.traceTransferBeanList.isEmpty()) {
return;
}
// Temporary buffer
List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);

this.clear();
}

private void initFirstBeanAddTime() {
if (firstBeanAddTime == 0) {
firstBeanAddTime = System.currentTimeMillis();
}
}

private void clear() {
this.firstBeanAddTime = 0;
this.currentMsgSize = 0;
this.traceTransferBeanList.clear();
}
}


class AsyncDataSendTask implements Runnable {
private final String traceTopicName;
private final String regionId;
private final List<TraceTransferBean> traceTransferBeanList;

public AsyncDataSendTask(String traceTopicName, String regionId, List<TraceTransferBean> traceTransferBeanList) {
this.traceTopicName = traceTopicName;
this.regionId = regionId;
this.traceTransferBeanList = traceTransferBeanList;
}

@Override
public void run() {
StringBuilder buffer = new StringBuilder(1024);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good habit to catch the unchecked exception in runnable, if we do not get the future result, for the ThreadPool will swallow the exception, and we get no information if something unexpected happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a try catch in the following invoked method sendTraceDataByMQ(), is this enough?

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it is enough since the code outside the try-catch is simple, but it may introduce vulnerabilities if someone else adds code to it in the future.
It is better to show the best practice if you have spare time.

int count = 0;
Set<String> keySet = new HashSet<String>();

for (TraceTransferBean bean : transBeanList) {
// Keyset of message trace includes msgId of or original message
for (TraceTransferBean bean : traceTransferBeanList) {
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
count++;
// Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
// Clear temporary buffer after finishing
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
}
transBeanList.clear();
sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
}

/**
* Send message trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
* @param traceTopic the topic which message trace data will send to
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;
if (AccessChannel.CLOUD == accessChannel) {
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
private void sendTraceDataByMQ(Set<String> keySet, final String data, String traceTopic) {
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
Expand Down