Skip to content

Commit

Permalink
[ISSUE #8681] fix trace topic name (#8680)
Browse files Browse the repository at this point in the history
* fix trace topic
  • Loading branch information
yuz10 authored Sep 19, 2024
1 parent d12635d commit 3e81fae
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,24 @@ public void run() {

public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<>(16);
String currentRegionId;
String traceTopic;
for (TraceContext context : contextList) {
currentRegionId = context.getRegionId();
AccessChannel accessChannel = context.getAccessChannel();
if (accessChannel == null) {
accessChannel = AsyncTraceDispatcher.this.accessChannel;
}
String currentRegionId = context.getRegionId();
if (currentRegionId == null || context.getTraceBeans().isEmpty()) {
continue;
}
if (AccessChannel.CLOUD == accessChannel) {
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId;
} else {
traceTopic = traceTopicName;
}

String topic = context.getTraceBeans().get(0).getTopic();
String key = topic + TraceConstants.CONTENT_SPLITOR + currentRegionId;
String key = topic + TraceConstants.CONTENT_SPLITOR + traceTopic;
List<TraceTransferBean> transBeanList = transBeanMap.computeIfAbsent(key, k -> new ArrayList<>());
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
Expand All @@ -320,7 +330,7 @@ public void sendTraceData(List<TraceContext> contextList) {
}
}

private void flushData(List<TraceTransferBean> transBeanList, String topic, String currentRegionId) {
private void flushData(List<TraceTransferBean> transBeanList, String topic, String traceTopic) {
if (transBeanList.size() == 0) {
return;
}
Expand All @@ -332,14 +342,14 @@ private void flushData(List<TraceTransferBean> transBeanList, String topic, Stri
buffer.append(bean.getTransData());
count++;
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString(), TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
sendTraceDataByMQ(keySet, buffer.toString(), traceTopic);
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString(), TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId);
sendTraceDataByMQ(keySet, buffer.toString(), traceTopic);
}
transBeanList.clear();
}
Expand Down

0 comments on commit 3e81fae

Please sign in to comment.