Skip to content

Commit

Permalink
feat(stream-client): perf and optimize (#70)
Browse files Browse the repository at this point in the history
* feat(stream-client): perf and optimize

- change buffer to direct
- add S3Storage cache limit protection
- add object data block flag(support compressed in future)

Signed-off-by: Robin Han <[email protected]>

* fix: fix checkstyle

Signed-off-by: Robin Han <[email protected]>

* fix: improve test pass rate

Signed-off-by: Robin Han <[email protected]>

---------

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Sep 7, 2023
1 parent 33c66a6 commit f262b29
Show file tree
Hide file tree
Showing 20 changed files with 352 additions and 264 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,8 @@ project(':core') {
implementation libs.slf4jlog4j
implementation libs.s3Client

implementation libs.zstd

compileOnly libs.log4j


Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" />
<allow pkg="software.amazon.awssdk" />
<allow pkg="com.github.luben.zstd" />


<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1102,10 +1102,10 @@ class Partition(val topicPartition: TopicPartition,
}
val lastRecordTimestamp = LAST_RECORD_TIMESTAMP.get()
val now = System.currentTimeMillis()
if (now - lastRecordTimestamp > 1000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
if (now - lastRecordTimestamp > 10000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
val updateWatermarkStatistics = UPDATE_WATERMARK_TIMER.getAndReset()
val tryCompleteStatistics = TRY_COMPLETE_TIMER.getAndReset()
logger.warn(s"handle offset move cost, maybeIncrementLeaderHW=$updateWatermarkStatistics, tryCompleteDelayedRequests=$tryCompleteStatistics")
logger.trace(s"handle offset move cost, maybeIncrementLeaderHW=$updateWatermarkStatistics, tryCompleteDelayedRequests=$tryCompleteStatistics")
}
}
// elastic stream inject end
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/es/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ class ElasticLog(val metaStream: MetaStream,
private def tryAppendStatistics(): Unit = {
val lastRecordTimestamp = LAST_RECORD_TIMESTAMP.get()
val now = System.currentTimeMillis()
if (now - lastRecordTimestamp > 1000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
if (now - lastRecordTimestamp > 10000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
val permitAcquireFailStatistics = APPEND_PERMIT_ACQUIRE_FAIL_TIMER.getAndReset()
val remainingPermits = APPEND_PERMIT_SEMAPHORE.availablePermits()
val appendStatistics = APPEND_TIMER.getAndReset()
val callbackStatistics = APPEND_CALLBACK_TIMER.getAndReset()
val ackStatistics = APPEND_ACK_TIMER.getAndReset()
logger.warn(s"log append cost, permitAcquireFail=$permitAcquireFailStatistics, remainingPermit=$remainingPermits/$APPEND_PERMIT ,append=$appendStatistics, callback=$callbackStatistics, ack=$ackStatistics")
logger.info(s"log append cost, permitAcquireFail=$permitAcquireFailStatistics, remainingPermit=$remainingPermits/$APPEND_PERMIT ,append=$appendStatistics, callback=$callbackStatistics, ack=$ackStatistics")
}
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/es/metrics/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public void inc() {
count.add(1);
}

public void inc(int n) {
count.add(n);
}

public Statistics getAndReset() {
long count = this.count.sumThenReset();
return new Statistics(count);
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.slf4j.Logger;
Expand Down Expand Up @@ -253,7 +252,11 @@ public DataBlock(ByteBuf buf, int recordCount) {
public CloseableIterator<StreamRecordBatch> iterator() {
ByteBuf buf = this.buf.duplicate();
AtomicInteger remainingRecordCount = new AtomicInteger(recordCount);
DataInputStream in = new DataInputStream(ZstdFactory.wrapForInput(buf.nioBuffer(), (byte) 0, BufferSupplier.NO_CACHING));
// skip magic and flag
buf.skipBytes(2);
// TODO: check flag, use uncompressed stream or compressed stream.
// DataInputStream in = new DataInputStream(ZstdFactory.wrapForInput(buf.nioBuffer(), (byte) 0, BufferSupplier.NO_CACHING));
DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf.nioBuffer()));
return new CloseableIterator<>() {
@Override
public boolean hasNext() {
Expand Down
Loading

0 comments on commit f262b29

Please sign in to comment.