diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java index 7ca74fb6037..29ebcc75b07 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java @@ -21,11 +21,13 @@ import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.msg.AttributeConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -40,6 +42,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; /** * Handle List of Proxy Message, which belong to the same stream id. @@ -62,6 +65,7 @@ public class ProxyMessageCache { private final AtomicLong cacheSize = new AtomicLong(0); private Long packageIndex = 0L; private long lastPrintTime = 0; + private long dataTime; /** * extra map used when sending to dataproxy */ @@ -78,6 +82,12 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String this.groupId = groupId; this.streamId = streamId; this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO); + try { + dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(), + instanceProfile.get(TASK_CYCLE_UNIT)); + } catch (ParseException e) { + LOGGER.info("trans dataTime error", e); + } extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false"); extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields())); } @@ -163,7 +173,7 @@ public SenderMessage fetchSenderMessage() { if (!bodyList.isEmpty()) { PackageAckInfo ackInfo = new PackageAckInfo(packageIndex, packageOffset, resultBatchSize, false); SenderMessage senderMessage = new SenderMessage(taskId, instanceId, groupId, streamId, bodyList, - AgentUtils.getCurrentTime(), extraMap, ackInfo); + dataTime, extraMap, ackInfo); packageIndex++; return senderMessage; } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java index ced881c3f78..6a79222a2a5 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java @@ -44,7 +44,6 @@ public static long timeStrConvertTomillSec(String time, String cycleUnit) public static long timeStrConvertTomillSec(String time, String cycleUnit, TimeZone timeZone) throws ParseException { long retTime = 0; - // SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat df = null; if (cycleUnit.equals("Y") && time.length() == 4) { df = new SimpleDateFormat("yyyy"); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index c7835cd3552..a80ce8b53b2 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -160,16 +160,15 @@ private Runnable coreThread() { private void printInstanceDetail() { if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) { - LOGGER.info("instanceManager coreThread running! taskId {} action count {}", taskId, - actionQueue.size()); List instances = instanceDb.getInstances(taskId); InstancePrintStat stat = new InstancePrintStat(); for (int i = 0; i < instances.size(); i++) { InstanceProfile instance = instances.get(i); stat.stat(instance.getState()); } - LOGGER.info("instanceManager coreThread running! taskId {} memory total {} db total {} db detail {} ", - taskId, instanceMap.size(), instances.size(), stat); + LOGGER.info( + "instanceManager running! taskId {} mem {} db total {} {} action count {}", + taskId, instanceMap.size(), instances.size(), stat, actionQueue.size()); lastPrintTime = AgentUtils.getCurrentTime(); } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index 6724247c771..29fb5633f6c 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -203,8 +203,7 @@ private void printTaskDetail() { TaskProfile task = tasksInDb.get(i); stat.stat(task.getState()); } - LOGGER.info("taskManager coreThread running! memory total {} db total {} db detail {} ", taskMap.size(), - tasksInDb.size(), stat); + LOGGER.info("taskManager running! mem {} db total {} {} ", taskMap.size(), tasksInDb.size(), stat); lastPrintTime = AgentUtils.getCurrentTime(); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index d13ef199639..bea53a1e9d2 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -331,11 +331,11 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { } private void asyncSendByMessageSender(SendMessageCallback cb, - List bodyList, String groupId, String streamId, long dt, String msgUUID, + List bodyList, String groupId, String streamId, long dataTime, String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap, boolean isProxySend) throws ProxysdkException { sender.asyncSendMessage(cb, bodyList, groupId, - streamId, dt, msgUUID, + streamId, dataTime, msgUUID, timeout, timeUnit, extraAttrMap, isProxySend); } @@ -428,7 +428,7 @@ public void onMessageAck(SendResult result) { message.getAckInfo().setHasAck(true); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, - System.currentTimeMillis(), message.getMsgCnt(), message.getTotalSize()); + dataTime, message.getMsgCnt(), message.getTotalSize()); } else { LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 089f0fee75a..cbc1cff372c 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -34,6 +34,7 @@ import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import com.google.gson.Gson; import lombok.AllArgsConstructor; @@ -80,6 +81,7 @@ import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP; import static org.apache.inlong.agent.constant.TaskConstants.JOB_FILE_META_ENV_LIST; import static org.apache.inlong.agent.constant.TaskConstants.OFFSET; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; /** * Read text files @@ -106,9 +108,9 @@ private class SourceData { private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024; - private final Integer FINISH_READ_MAX_COUNT = 30; + private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 30; private final Long INODE_UPDATE_INTERVAL_MS = 1000L; - private final Integer READ_WAIT_TIMEOUT_MS = 1000; + private final Integer READ_WAIT_TIMEOUT_MS = 10; private final SimpleDateFormat RECORD_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); public InstanceProfile profile; private String taskId; @@ -125,11 +127,12 @@ private class SourceData { private BlockingQueue queue; private final Gson GSON = new Gson(); private volatile boolean runnable = true; - private volatile int readEndCount = 0; private volatile boolean fileExist = true; private String inodeInfo; private volatile long lastInodeUpdateTime = 0; private volatile boolean running = false; + private long dataTime = 0; + private volatile long emptyCount = 0; public LogFileSource() { OffsetManager.init(); @@ -153,6 +156,8 @@ public void init(InstanceProfile profile) { linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); bytePosition = getBytePositionByLine(linePosition); queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); + dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(), + profile.get(TASK_CYCLE_UNIT)); try { registerMeta(profile); } catch (Exception ex) { @@ -249,7 +254,7 @@ private long getBytePositionByLine(long linePosition) throws IOException { } } } catch (Exception e) { - LOGGER.error("getBytePositionByLine error {}", e.getStackTrace()); + LOGGER.error("getBytePositionByLine error: ", e); } finally { if (input != null) { input.close(); @@ -344,7 +349,7 @@ public Message read() { private Message createMessage(SourceData sourceData) { String msgWithMetaData = fillMetaData(sourceData.data); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis(), 1, msgWithMetaData.length()); + dataTime, 1, msgWithMetaData.length()); String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); Map header = new HashMap<>(); header.put(PROXY_KEY_DATA, proxyPartitionKey); @@ -429,6 +434,17 @@ public Runnable coreThread() { } catch (IOException e) { LOGGER.error("readFromPos error {}", e.getMessage()); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); + if (lines.isEmpty()) { + if (queue.isEmpty()) { + emptyCount++; + } else { + emptyCount = 0; + } + AgentUtils.silenceSleepInSeconds(1); + continue; + } + emptyCount = 0; for (int i = 0; i < lines.size(); i++) { boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length()); if (!suc4Queue) { @@ -436,17 +452,10 @@ public Runnable coreThread() { } putIntoQueue(lines.get(i)); } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - if (lines.isEmpty()) { - readEndCount++; - AgentUtils.silenceSleepInSeconds(1); - } else { - readEndCount = 0; - if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { - lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", - file.getName(), linePosition, bytePosition, file.length(), lines.size()); - } + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { + lastPrintTime = AgentUtils.getCurrentTime(); + LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", + file.getName(), linePosition, bytePosition, file.length(), lines.size()); } } running = false; @@ -530,15 +539,7 @@ private void clearQueue(BlockingQueue queue) { @Override public boolean sourceFinish() { - if (finishReadLog() && queue.isEmpty()) { - return true; - } else { - return false; - } - } - - public boolean finishReadLog() { - return readEndCount > FINISH_READ_MAX_COUNT; + return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST; } @Override diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index b2a3b605b78..67ce1dec765 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -73,7 +73,7 @@ private LogFileSource getSource() { Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10); Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0); Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2); - Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1); + Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3); Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10); source.init(instanceProfile); return source; @@ -101,10 +101,7 @@ private void testFullRead() { srcLen += check[i].getBytes(StandardCharsets.UTF_8).length; } LogFileSource source = getSource(); - await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog()); int cnt = 0; - int leftBeforeRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT); - Assert.assertTrue(leftBeforeRead + srcLen == DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT); Message msg = source.read(); int readLen = 0; while (msg != null) { @@ -114,7 +111,7 @@ private void testFullRead() { msg = source.read(); cnt++; } - await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish()); + await().atMost(6, TimeUnit.SECONDS).until(() -> source.sourceFinish()); source.destroy(); Assert.assertTrue(cnt == 3); Assert.assertTrue(srcLen == readLen); @@ -124,7 +121,6 @@ private void testFullRead() { private void testCleanQueue() { LogFileSource source = getSource(); - await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog()); for (int i = 0; i < 2; i++) { source.read(); }