Skip to content

Commit

Permalink
feat(core): metrics refine & upgrade s3stream to 0.4.2-SNAPSHOT (#450)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 17, 2023
1 parent c71f28d commit a9c6277
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ConfigUtils {

public static Config to(KafkaConfig s) {
return new Config()
.brokerId(s.brokerId())
.nodeId(s.brokerId())
.endpoint(s.s3Endpoint())
.region(s.s3Region())
.bucket(s.s3Bucket())
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server

import com.automq.stream.s3.metrics.TimerUtil
import com.yammer.metrics.core.Histogram
import kafka.admin.AdminUtils
import kafka.api.ElectLeadersRequestOps
Expand Down Expand Up @@ -81,7 +82,7 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_I
import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Executors}
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.{Collections, Optional}
import scala.annotation.nowarn
Expand Down Expand Up @@ -622,7 +623,7 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle a produce request
*/
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val startNanos = System.nanoTime()
val timerTotal: TimerUtil = new TimerUtil()
val produceRequest = request.body[ProduceRequest]

if (RequestUtils.hasTransactionalRecords(produceRequest)) {
Expand Down Expand Up @@ -669,8 +670,8 @@ class KafkaApis(val requestChannel: RequestChannel,
@nowarn("cat=deprecation")
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
// AutoMQ for Kafka inject start
val callbackStartNanos = System.nanoTime()
PRODUCE_CALLBACK_TIME_HIST.update((callbackStartNanos - startNanos) / 1000)
val timerCallback: TimerUtil = new TimerUtil()
PRODUCE_CALLBACK_TIME_HIST.update(timerTotal.elapsedAs(TimeUnit.MICROSECONDS))
// AutoMQ for Kafka inject end


Expand Down Expand Up @@ -732,11 +733,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}

// AutoMQ for Kafka inject start
PRODUCE_ACK_TIME_HIST.update((System.nanoTime() - callbackStartNanos) / 1000)
PRODUCE_ACK_TIME_HIST.update(timerCallback.elapsedAs(TimeUnit.MICROSECONDS))
val now = System.currentTimeMillis()
val lastRecordTimestamp = LAST_RECORD_TIMESTAMP.get();
if (now - lastRecordTimestamp > 60000 && LAST_RECORD_TIMESTAMP.compareAndSet(lastRecordTimestamp, now)) {
info(s"produce cost, produce=${KafkaMetricsUtil.histToString(PRODUCE_TIME_HIST)} " +
info(s"produce cost (in microseconds), produce=${KafkaMetricsUtil.histToString(PRODUCE_TIME_HIST)} " +
s"callback=${KafkaMetricsUtil.histToString(PRODUCE_CALLBACK_TIME_HIST)} " +
s"ack=${KafkaMetricsUtil.histToString(PRODUCE_ACK_TIME_HIST)}")
PRODUCE_TIME_HIST.clear()
Expand Down Expand Up @@ -773,7 +774,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
PRODUCE_TIME_HIST.update((System.nanoTime() - startNanos) / 1000)
PRODUCE_TIME_HIST.update(timerTotal.elapsedAs(TimeUnit.MICROSECONDS))
}
// TODO: quick throttle when underline permit is not enough
// TODO: isolate to a separate thread pool to avoid blocking io thread. Connection should be bind to certain async thread to keep the order
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ versions += [
zookeeper: "3.6.3",
zstd: "1.5.2-1",
commonLang: "3.12.0",
s3stream: "0.3.0-SNAPSHOT",
s3stream: "0.4.2-SNAPSHOT",
]
libs += [
activation: "javax.activation:activation:$versions.activation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -393,7 +394,9 @@ public void printWindow() {
long elapsed = System.currentTimeMillis() - windowStart;
double recsPerSec = 1000.0 * windowCount / (double) elapsed;
double mbPerSec = 1000.0 * this.windowBytes / (double) elapsed / (1024.0 * 1024.0);
System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n",
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
System.out.printf("[%s] %d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n",
sdf.format(windowStart),
windowCount,
recsPerSec,
mbPerSec,
Expand Down

0 comments on commit a9c6277

Please sign in to comment.