Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9284][Agent] Report audit by data time not real time #9292

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.msg.AttributeConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -40,6 +42,7 @@
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;

/**
* Handle List of Proxy Message, which belong to the same stream id.
Expand All @@ -62,6 +65,7 @@ public class ProxyMessageCache {
private final AtomicLong cacheSize = new AtomicLong(0);
private Long packageIndex = 0L;
private long lastPrintTime = 0;
private long dataTime;
/**
* extra map used when sending to dataproxy
*/
Expand All @@ -78,6 +82,12 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
this.groupId = groupId;
this.streamId = streamId;
this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
try {
dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(),
instanceProfile.get(TASK_CYCLE_UNIT));
} catch (ParseException e) {
LOGGER.info("trans dataTime error", e);
}
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
}
Expand Down Expand Up @@ -163,7 +173,7 @@ public SenderMessage fetchSenderMessage() {
if (!bodyList.isEmpty()) {
PackageAckInfo ackInfo = new PackageAckInfo(packageIndex, packageOffset, resultBatchSize, false);
SenderMessage senderMessage = new SenderMessage(taskId, instanceId, groupId, streamId, bodyList,
AgentUtils.getCurrentTime(), extraMap, ackInfo);
dataTime, extraMap, ackInfo);
packageIndex++;
return senderMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public static long timeStrConvertTomillSec(String time, String cycleUnit)
public static long timeStrConvertTomillSec(String time, String cycleUnit, TimeZone timeZone)
throws ParseException {
long retTime = 0;
// SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat df = null;
if (cycleUnit.equals("Y") && time.length() == 4) {
df = new SimpleDateFormat("yyyy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,15 @@ private Runnable coreThread() {

private void printInstanceDetail() {
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) {
LOGGER.info("instanceManager coreThread running! taskId {} action count {}", taskId,
actionQueue.size());
List<InstanceProfile> instances = instanceDb.getInstances(taskId);
InstancePrintStat stat = new InstancePrintStat();
for (int i = 0; i < instances.size(); i++) {
InstanceProfile instance = instances.get(i);
stat.stat(instance.getState());
}
LOGGER.info("instanceManager coreThread running! taskId {} memory total {} db total {} db detail {} ",
taskId, instanceMap.size(), instances.size(), stat);
LOGGER.info(
"instanceManager running! taskId {} mem {} db total {} {} action count {}",
taskId, instanceMap.size(), instances.size(), stat, actionQueue.size());
lastPrintTime = AgentUtils.getCurrentTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ private void printTaskDetail() {
TaskProfile task = tasksInDb.get(i);
stat.stat(task.getState());
}
LOGGER.info("taskManager coreThread running! memory total {} db total {} db detail {} ", taskMap.size(),
tasksInDb.size(), stat);
LOGGER.info("taskManager running! mem {} db total {} {} ", taskMap.size(), tasksInDb.size(), stat);
lastPrintTime = AgentUtils.getCurrentTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
}

private void asyncSendByMessageSender(SendMessageCallback cb,
List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
List<byte[]> bodyList, String groupId, String streamId, long dataTime, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
sender.asyncSendMessage(cb, bodyList, groupId,
streamId, dt, msgUUID,
streamId, dataTime, msgUUID,
timeout, timeUnit, extraAttrMap, isProxySend);
}

Expand Down Expand Up @@ -428,7 +428,7 @@ public void onMessageAck(SendResult result) {
message.getAckInfo().setHasAck(true);
getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId,
System.currentTimeMillis(), message.getMsgCnt(), message.getTotalSize());
dataTime, message.getMsgCnt(), message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;

import com.google.gson.Gson;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -80,6 +81,7 @@
import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_FILE_META_ENV_LIST;
import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;

/**
* Read text files
Expand All @@ -106,9 +108,9 @@ private class SourceData {
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Integer FINISH_READ_MAX_COUNT = 30;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 30;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
private final Integer READ_WAIT_TIMEOUT_MS = 1000;
private final Integer READ_WAIT_TIMEOUT_MS = 10;
private final SimpleDateFormat RECORD_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public InstanceProfile profile;
private String taskId;
Expand All @@ -125,11 +127,12 @@ private class SourceData {
private BlockingQueue<SourceData> queue;
private final Gson GSON = new Gson();
private volatile boolean runnable = true;
private volatile int readEndCount = 0;
private volatile boolean fileExist = true;
private String inodeInfo;
private volatile long lastInodeUpdateTime = 0;
private volatile boolean running = false;
private long dataTime = 0;
private volatile long emptyCount = 0;

public LogFileSource() {
OffsetManager.init();
Expand All @@ -153,6 +156,8 @@ public void init(InstanceProfile profile) {
linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(),
profile.get(TASK_CYCLE_UNIT));
try {
registerMeta(profile);
} catch (Exception ex) {
Expand Down Expand Up @@ -249,7 +254,7 @@ private long getBytePositionByLine(long linePosition) throws IOException {
}
}
} catch (Exception e) {
LOGGER.error("getBytePositionByLine error {}", e.getStackTrace());
LOGGER.error("getBytePositionByLine error: ", e);
} finally {
if (input != null) {
input.close();
Expand Down Expand Up @@ -344,7 +349,7 @@ public Message read() {
private Message createMessage(SourceData sourceData) {
String msgWithMetaData = fillMetaData(sourceData.data);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
System.currentTimeMillis(), 1, msgWithMetaData.length());
dataTime, 1, msgWithMetaData.length());
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
Expand Down Expand Up @@ -429,24 +434,28 @@ public Runnable coreThread() {
} catch (IOException e) {
LOGGER.error("readFromPos error {}", e.getMessage());
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
if (lines.isEmpty()) {
if (queue.isEmpty()) {
emptyCount++;
} else {
emptyCount = 0;
}
AgentUtils.silenceSleepInSeconds(1);
continue;
}
emptyCount = 0;
for (int i = 0; i < lines.size(); i++) {
boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
if (!suc4Queue) {
break;
}
putIntoQueue(lines.get(i));
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
if (lines.isEmpty()) {
readEndCount++;
AgentUtils.silenceSleepInSeconds(1);
} else {
readEndCount = 0;
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}",
file.getName(), linePosition, bytePosition, file.length(), lines.size());
}
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}",
file.getName(), linePosition, bytePosition, file.length(), lines.size());
}
}
running = false;
Expand Down Expand Up @@ -530,15 +539,7 @@ private void clearQueue(BlockingQueue<SourceData> queue) {

@Override
public boolean sourceFinish() {
if (finishReadLog() && queue.isEmpty()) {
return true;
} else {
return false;
}
}

public boolean finishReadLog() {
return readEndCount > FINISH_READ_MAX_COUNT;
return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private LogFileSource getSource() {
Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2);
Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
source.init(instanceProfile);
return source;
Expand Down Expand Up @@ -101,10 +101,7 @@ private void testFullRead() {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
LogFileSource source = getSource();
await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog());
int cnt = 0;
int leftBeforeRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
Assert.assertTrue(leftBeforeRead + srcLen == DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
Message msg = source.read();
int readLen = 0;
while (msg != null) {
Expand All @@ -114,7 +111,7 @@ private void testFullRead() {
msg = source.read();
cnt++;
}
await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
await().atMost(6, TimeUnit.SECONDS).until(() -> source.sourceFinish());
source.destroy();
Assert.assertTrue(cnt == 3);
Assert.assertTrue(srcLen == readLen);
Expand All @@ -124,7 +121,6 @@ private void testFullRead() {

private void testCleanQueue() {
LogFileSource source = getSource();
await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog());
for (int i = 0; i < 2; i++) {
source.read();
}
Expand Down