Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream-client): perf and optimize #70

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,8 @@ project(':core') {
implementation libs.slf4jlog4j
implementation libs.s3Client

implementation libs.zstd

compileOnly libs.log4j


Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" />
<allow pkg="software.amazon.awssdk" />
<allow pkg="com.github.luben.zstd" />


<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1102,10 +1102,10 @@ class Partition(val topicPartition: TopicPartition,
}
val lastRecordTimestamp = LAST_RECORD_TIMESTAMP.get()
val now = System.currentTimeMillis()
if (now - lastRecordTimestamp > 1000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
if (now - lastRecordTimestamp > 10000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
val updateWatermarkStatistics = UPDATE_WATERMARK_TIMER.getAndReset()
val tryCompleteStatistics = TRY_COMPLETE_TIMER.getAndReset()
logger.warn(s"handle offset move cost, maybeIncrementLeaderHW=$updateWatermarkStatistics, tryCompleteDelayedRequests=$tryCompleteStatistics")
logger.trace(s"handle offset move cost, maybeIncrementLeaderHW=$updateWatermarkStatistics, tryCompleteDelayedRequests=$tryCompleteStatistics")
}
}
// elastic stream inject end
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/es/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ class ElasticLog(val metaStream: MetaStream,
private def tryAppendStatistics(): Unit = {
val lastRecordTimestamp = LAST_RECORD_TIMESTAMP.get()
val now = System.currentTimeMillis()
if (now - lastRecordTimestamp > 1000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
if (now - lastRecordTimestamp > 10000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
val permitAcquireFailStatistics = APPEND_PERMIT_ACQUIRE_FAIL_TIMER.getAndReset()
val remainingPermits = APPEND_PERMIT_SEMAPHORE.availablePermits()
val appendStatistics = APPEND_TIMER.getAndReset()
val callbackStatistics = APPEND_CALLBACK_TIMER.getAndReset()
val ackStatistics = APPEND_ACK_TIMER.getAndReset()
logger.warn(s"log append cost, permitAcquireFail=$permitAcquireFailStatistics, remainingPermit=$remainingPermits/$APPEND_PERMIT ,append=$appendStatistics, callback=$callbackStatistics, ack=$ackStatistics")
logger.info(s"log append cost, permitAcquireFail=$permitAcquireFailStatistics, remainingPermit=$remainingPermits/$APPEND_PERMIT ,append=$appendStatistics, callback=$callbackStatistics, ack=$ackStatistics")
}
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/es/metrics/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public void inc() {
count.add(1);
}

public void inc(int n) {
count.add(n);
}

public Statistics getAndReset() {
long count = this.count.sumThenReset();
return new Statistics(count);
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.slf4j.Logger;
Expand Down Expand Up @@ -253,7 +252,11 @@ public DataBlock(ByteBuf buf, int recordCount) {
public CloseableIterator<StreamRecordBatch> iterator() {
ByteBuf buf = this.buf.duplicate();
AtomicInteger remainingRecordCount = new AtomicInteger(recordCount);
DataInputStream in = new DataInputStream(ZstdFactory.wrapForInput(buf.nioBuffer(), (byte) 0, BufferSupplier.NO_CACHING));
// skip magic and flag
buf.skipBytes(2);
// TODO: check flag, use uncompressed stream or compressed stream.
// DataInputStream in = new DataInputStream(ZstdFactory.wrapForInput(buf.nioBuffer(), (byte) 0, BufferSupplier.NO_CACHING));
DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf.nioBuffer()));
return new CloseableIterator<>() {
@Override
public boolean hasNext() {
Expand Down
Loading