diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 7ff8bd77e03..19c276238a9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; @@ -30,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; @@ -59,12 +57,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private final int queueSize; private final int batchSize; private final int maxMsgSize; + private final long pollingTimeMil; + private final long waitTimeThresholdMil; private final DefaultMQProducer traceProducer; private final ThreadPoolExecutor traceExecutor; // The last discard number of log private AtomicLong discardCount; private Thread worker; private final ArrayBlockingQueue traceContextQueue; + private final HashMap taskQueueByTopic; private ArrayBlockingQueue appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; @@ -72,9 +73,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); - private String traceTopicName; + private volatile String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); - private AccessChannel accessChannel = AccessChannel.LOCAL; + private volatile AccessChannel accessChannel = AccessChannel.LOCAL; private String group; private Type type; @@ -83,8 +84,11 @@ public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCH this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; + this.pollingTimeMil = 100; + this.waitTimeThresholdMil = 500; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue(1024); + this.taskQueueByTopic = new HashMap(); this.group = group; this.type = type; @@ -243,113 +247,137 @@ class AsyncRunnable implements Runnable { @Override public void run() { while (!stopped) { - List contexts = new ArrayList(batchSize); synchronized (traceContextQueue) { - for (int i = 0; i < batchSize; i++) { - TraceContext context = null; + long endTime = System.currentTimeMillis() + pollingTimeMil; + while (System.currentTimeMillis() < endTime) { try { - //get trace data element from blocking Queue - traceContextQueue - context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - if (context != null) { - contexts.add(context); - } else { - break; + TraceContext traceContext = traceContextQueue.poll( + endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS + ); + + if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) { + // get the topic which the trace message will send to + String traceTopicName = this.getTraceTopicName(traceContext.getRegionId()); + + // get the traceDataSegment which will save this trace message, create if null + TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName); + if (traceDataSegment == null) { + traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId()); + taskQueueByTopic.put(traceTopicName, traceDataSegment); + } + + // encode traceContext and save it into traceDataSegment + // NOTE if data size in traceDataSegment more than maxMsgSize, + // a AsyncDataSendTask will be created and submitted + TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext); + traceDataSegment.addTraceTransferBean(traceTransferBean); + } + } catch (InterruptedException ignore) { + log.debug("traceContextQueue#poll exception"); } } - if (contexts.size() > 0) { - AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); - traceExecutor.submit(request); - } else if (AsyncTraceDispatcher.this.stopped) { + + // NOTE send the data in traceDataSegment which the first TraceTransferBean + // is longer than waitTimeThreshold + sendDataByTimeThreshold(); + + if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } } } } - } - class AsyncAppenderRequest implements Runnable { - List contextList; + private void sendDataByTimeThreshold() { + long now = System.currentTimeMillis(); + for (TraceDataSegment taskInfo : taskQueueByTopic.values()) { + if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) { + taskInfo.sendAllData(); + } + } + } - public AsyncAppenderRequest(final List contextList) { - if (contextList != null) { - this.contextList = contextList; - } else { - this.contextList = new ArrayList(1); + private String getTraceTopicName(String regionId) { + AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel(); + if (AccessChannel.CLOUD == accessChannel) { + return TraceConstants.TRACE_TOPIC_PREFIX + regionId; } + + return AsyncTraceDispatcher.this.getTraceTopicName(); } + } - @Override - public void run() { - sendTraceData(contextList); + class TraceDataSegment { + private long firstBeanAddTime; + private int currentMsgSize; + private final String traceTopicName; + private final String regionId; + private final List traceTransferBeanList = new ArrayList(); + + TraceDataSegment(String traceTopicName, String regionId) { + this.traceTopicName = traceTopicName; + this.regionId = regionId; } - public void sendTraceData(List contextList) { - Map> transBeanMap = new HashMap>(); - for (TraceContext context : contextList) { - if (context.getTraceBeans().isEmpty()) { - continue; - } - // Topic value corresponding to original message entity content - String topic = context.getTraceBeans().get(0).getTopic(); - String regionId = context.getRegionId(); - // Use original message entity's topic as key - String key = topic; - if (!StringUtils.isBlank(regionId)) { - key = key + TraceConstants.CONTENT_SPLITOR + regionId; - } - List transBeanList = transBeanMap.get(key); - if (transBeanList == null) { - transBeanList = new ArrayList(); - transBeanMap.put(key, transBeanList); - } - TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); - transBeanList.add(traceData); - } - for (Map.Entry> entry : transBeanMap.entrySet()) { - String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR)); - String dataTopic = entry.getKey(); - String regionId = null; - if (key.length > 1) { - dataTopic = key[0]; - regionId = key[1]; - } - flushData(entry.getValue(), dataTopic, regionId); + public void addTraceTransferBean(TraceTransferBean traceTransferBean) { + initFirstBeanAddTime(); + this.traceTransferBeanList.add(traceTransferBean); + this.currentMsgSize += traceTransferBean.getTransData().length(); + if (currentMsgSize >= traceProducer.getMaxMessageSize()) { + List dataToSend = new ArrayList(traceTransferBeanList); + AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend); + traceExecutor.submit(asyncDataSendTask); + + this.clear(); } } - /** - * Batch sending data actually - */ - private void flushData(List transBeanList, String dataTopic, String regionId) { - if (transBeanList.size() == 0) { + public void sendAllData() { + if (this.traceTransferBeanList.isEmpty()) { return; } - // Temporary buffer + List dataToSend = new ArrayList(traceTransferBeanList); + AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend); + traceExecutor.submit(asyncDataSendTask); + + this.clear(); + } + + private void initFirstBeanAddTime() { + if (firstBeanAddTime == 0) { + firstBeanAddTime = System.currentTimeMillis(); + } + } + + private void clear() { + this.firstBeanAddTime = 0; + this.currentMsgSize = 0; + this.traceTransferBeanList.clear(); + } + } + + + class AsyncDataSendTask implements Runnable { + private final String traceTopicName; + private final String regionId; + private final List traceTransferBeanList; + + public AsyncDataSendTask(String traceTopicName, String regionId, List traceTransferBeanList) { + this.traceTopicName = traceTopicName; + this.regionId = regionId; + this.traceTransferBeanList = traceTransferBeanList; + } + + @Override + public void run() { StringBuilder buffer = new StringBuilder(1024); - int count = 0; Set keySet = new HashSet(); - - for (TraceTransferBean bean : transBeanList) { - // Keyset of message trace includes msgId of or original message + for (TraceTransferBean bean : traceTransferBeanList) { keySet.addAll(bean.getTransKey()); buffer.append(bean.getTransData()); - count++; - // Ensure that the size of the package should not exceed the upper limit. - if (buffer.length() >= traceProducer.getMaxMessageSize()) { - sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); - // Clear temporary buffer after finishing - buffer.delete(0, buffer.length()); - keySet.clear(); - count = 0; - } - } - if (count > 0) { - sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); } - transBeanList.clear(); + sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName); } /** @@ -357,12 +385,9 @@ private void flushData(List transBeanList, String dataTopic, * * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) * @param data the message trace data in this batch + * @param traceTopic the topic which message trace data will send to */ - private void sendTraceDataByMQ(Set keySet, final String data, String dataTopic, String regionId) { - String traceTopic = traceTopicName; - if (AccessChannel.CLOUD == accessChannel) { - traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; - } + private void sendTraceDataByMQ(Set keySet, final String data, String traceTopic) { final Message message = new Message(traceTopic, data.getBytes()); // Keyset of message trace includes msgId of or original message message.setKeys(keySet);