From 6b7f329715da638fe4a08bab2eb376aecb2bf2fb Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Tue, 27 Feb 2024 15:00:41 +0800 Subject: [PATCH] feat: record pooled record memory usage (#846) Signed-off-by: Robin Han --- .../kafka/log/streamaspect/ElasticLogFileRecords.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index d32a70fdc7..4f4dd72df0 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -56,6 +56,11 @@ public class ElasticLogFileRecords { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticLogFileRecords.class); + private static final int POOLED_MEMORY_RECORDS = 30; + static { + DirectByteBufAlloc.registerAllocType(POOLED_MEMORY_RECORDS, "pooled_memory_records"); + } + protected final AtomicInteger size; // only used for recover protected final Iterable batches; @@ -320,7 +325,7 @@ private PooledMemoryRecords(long logBaseOffset, List fetchResults, } // TODO: create a new ByteBufMemoryRecords data struct to avoid copy if (pooled) { - this.pack = DirectByteBufAlloc.byteBuffer(size); + this.pack = DirectByteBufAlloc.byteBuffer(size, POOLED_MEMORY_RECORDS); } else { this.pack = Unpooled.buffer(size); }