Skip to content

Commit

Permalink
[INLONG-9284][Agent] Report audit by data time not real time (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored and doleyzi committed Nov 15, 2023
1 parent bcf9b90 commit de2b4d3
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.msg.AttributeConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -40,6 +42,7 @@
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;

/**
* Handle List of Proxy Message, which belong to the same stream id.
Expand All @@ -62,6 +65,7 @@ public class ProxyMessageCache {
private final AtomicLong cacheSize = new AtomicLong(0);
private Long packageIndex = 0L;
private long lastPrintTime = 0;
private long dataTime;
/**
* extra map used when sending to dataproxy
*/
Expand All @@ -78,6 +82,12 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
this.groupId = groupId;
this.streamId = streamId;
this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
try {
dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(),
instanceProfile.get(TASK_CYCLE_UNIT));
} catch (ParseException e) {
LOGGER.info("trans dataTime error", e);
}
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
}
Expand Down Expand Up @@ -163,7 +173,7 @@ public SenderMessage fetchSenderMessage() {
if (!bodyList.isEmpty()) {
PackageAckInfo ackInfo = new PackageAckInfo(packageIndex, packageOffset, resultBatchSize, false);
SenderMessage senderMessage = new SenderMessage(taskId, instanceId, groupId, streamId, bodyList,
AgentUtils.getCurrentTime(), extraMap, ackInfo);
dataTime, extraMap, ackInfo);
packageIndex++;
return senderMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public static long timeStrConvertTomillSec(String time, String cycleUnit)
public static long timeStrConvertTomillSec(String time, String cycleUnit, TimeZone timeZone)
throws ParseException {
long retTime = 0;
// SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat df = null;
if (cycleUnit.equals("Y") && time.length() == 4) {
df = new SimpleDateFormat("yyyy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,15 @@ private Runnable coreThread() {

private void printInstanceDetail() {
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) {
LOGGER.info("instanceManager coreThread running! taskId {} action count {}", taskId,
actionQueue.size());
List<InstanceProfile> instances = instanceDb.getInstances(taskId);
InstancePrintStat stat = new InstancePrintStat();
for (int i = 0; i < instances.size(); i++) {
InstanceProfile instance = instances.get(i);
stat.stat(instance.getState());
}
LOGGER.info("instanceManager coreThread running! taskId {} memory total {} db total {} db detail {} ",
taskId, instanceMap.size(), instances.size(), stat);
LOGGER.info(
"instanceManager running! taskId {} mem {} db total {} {} action count {}",
taskId, instanceMap.size(), instances.size(), stat, actionQueue.size());
lastPrintTime = AgentUtils.getCurrentTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ private void printTaskDetail() {
TaskProfile task = tasksInDb.get(i);
stat.stat(task.getState());
}
LOGGER.info("taskManager coreThread running! memory total {} db total {} db detail {} ", taskMap.size(),
tasksInDb.size(), stat);
LOGGER.info("taskManager running! mem {} db total {} {} ", taskMap.size(), tasksInDb.size(), stat);
lastPrintTime = AgentUtils.getCurrentTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
}

private void asyncSendByMessageSender(SendMessageCallback cb,
List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
List<byte[]> bodyList, String groupId, String streamId, long dataTime, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
sender.asyncSendMessage(cb, bodyList, groupId,
streamId, dt, msgUUID,
streamId, dataTime, msgUUID,
timeout, timeUnit, extraAttrMap, isProxySend);
}

Expand Down Expand Up @@ -428,7 +428,7 @@ public void onMessageAck(SendResult result) {
message.getAckInfo().setHasAck(true);
getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId,
System.currentTimeMillis(), message.getMsgCnt(), message.getTotalSize());
dataTime, message.getMsgCnt(), message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;

import com.google.gson.Gson;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -80,6 +81,7 @@
import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_FILE_META_ENV_LIST;
import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;

/**
* Read text files
Expand All @@ -106,9 +108,9 @@ private class SourceData {
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Integer FINISH_READ_MAX_COUNT = 30;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 30;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
private final Integer READ_WAIT_TIMEOUT_MS = 1000;
private final Integer READ_WAIT_TIMEOUT_MS = 10;
private final SimpleDateFormat RECORD_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public InstanceProfile profile;
private String taskId;
Expand All @@ -125,11 +127,12 @@ private class SourceData {
private BlockingQueue<SourceData> queue;
private final Gson GSON = new Gson();
private volatile boolean runnable = true;
private volatile int readEndCount = 0;
private volatile boolean fileExist = true;
private String inodeInfo;
private volatile long lastInodeUpdateTime = 0;
private volatile boolean running = false;
private long dataTime = 0;
private volatile long emptyCount = 0;

public LogFileSource() {
OffsetManager.init();
Expand All @@ -153,6 +156,8 @@ public void init(InstanceProfile profile) {
linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(),
profile.get(TASK_CYCLE_UNIT));
try {
registerMeta(profile);
} catch (Exception ex) {
Expand Down Expand Up @@ -249,7 +254,7 @@ private long getBytePositionByLine(long linePosition) throws IOException {
}
}
} catch (Exception e) {
LOGGER.error("getBytePositionByLine error {}", e.getStackTrace());
LOGGER.error("getBytePositionByLine error: ", e);
} finally {
if (input != null) {
input.close();
Expand Down Expand Up @@ -344,7 +349,7 @@ public Message read() {
private Message createMessage(SourceData sourceData) {
String msgWithMetaData = fillMetaData(sourceData.data);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
System.currentTimeMillis(), 1, msgWithMetaData.length());
dataTime, 1, msgWithMetaData.length());
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
Expand Down Expand Up @@ -429,24 +434,28 @@ public Runnable coreThread() {
} catch (IOException e) {
LOGGER.error("readFromPos error {}", e.getMessage());
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
if (lines.isEmpty()) {
if (queue.isEmpty()) {
emptyCount++;
} else {
emptyCount = 0;
}
AgentUtils.silenceSleepInSeconds(1);
continue;
}
emptyCount = 0;
for (int i = 0; i < lines.size(); i++) {
boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
if (!suc4Queue) {
break;
}
putIntoQueue(lines.get(i));
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
if (lines.isEmpty()) {
readEndCount++;
AgentUtils.silenceSleepInSeconds(1);
} else {
readEndCount = 0;
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}",
file.getName(), linePosition, bytePosition, file.length(), lines.size());
}
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}",
file.getName(), linePosition, bytePosition, file.length(), lines.size());
}
}
running = false;
Expand Down Expand Up @@ -530,15 +539,7 @@ private void clearQueue(BlockingQueue<SourceData> queue) {

@Override
public boolean sourceFinish() {
if (finishReadLog() && queue.isEmpty()) {
return true;
} else {
return false;
}
}

public boolean finishReadLog() {
return readEndCount > FINISH_READ_MAX_COUNT;
return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private LogFileSource getSource() {
Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2);
Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
source.init(instanceProfile);
return source;
Expand Down Expand Up @@ -101,10 +101,7 @@ private void testFullRead() {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
LogFileSource source = getSource();
await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog());
int cnt = 0;
int leftBeforeRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
Assert.assertTrue(leftBeforeRead + srcLen == DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
Message msg = source.read();
int readLen = 0;
while (msg != null) {
Expand All @@ -114,7 +111,7 @@ private void testFullRead() {
msg = source.read();
cnt++;
}
await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
await().atMost(6, TimeUnit.SECONDS).until(() -> source.sourceFinish());
source.destroy();
Assert.assertTrue(cnt == 3);
Assert.assertTrue(srcLen == readLen);
Expand All @@ -124,7 +121,6 @@ private void testFullRead() {

private void testCleanQueue() {
LogFileSource source = getSource();
await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog());
for (int i = 0; i < 2; i++) {
source.read();
}
Expand Down

0 comments on commit de2b4d3

Please sign in to comment.