Skip to content

Commit

Permalink
Introduce setting for committing without backpressure (#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Sep 5, 2019
1 parent e8a9e17 commit e1e398c
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 50 deletions.
16 changes: 9 additions & 7 deletions benchmarks/src/it/scala/akka/kafka/benchmarks/Benchmarks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import org.scalatest.FlatSpecLike
import scala.concurrent.duration._

abstract class BenchmarksBase() extends ScalatestKafkaSpec(0) with FlatSpecLike {

// Message count multiplier to adapt for shorter local testing
val factor = 1000

override def bootstrapServers: String =
Expand Down Expand Up @@ -56,14 +58,14 @@ class AlpakkaKafkaPlainConsumer extends BenchmarksBase() {

class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
it should "bench with small messages" in {
val cmd = RunTestCommand("apache-kafka-batched-consumer", bootstrapServers, 1000 * 1000, 100)
val cmd = RunTestCommand("apache-kafka-batched-consumer", bootstrapServers, 1000 * factor, 100)
runPerfTest(cmd,
KafkaConsumerFixtures.filledTopics(cmd),
KafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
}

it should "bench with normal messages" in {
val cmd = RunTestCommand("apache-kafka-batched-consumer-normal-msg", bootstrapServers, 1000 * 1000, 5 * 1000)
val cmd = RunTestCommand("apache-kafka-batched-consumer-normal-msg", bootstrapServers, 1000 * factor, 5 * 1000)
runPerfTest(cmd,
KafkaConsumerFixtures.filledTopics(cmd),
KafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
Expand All @@ -72,7 +74,7 @@ class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
it should "bench with normal messages and eight partitions" in {
val cmd = RunTestCommand("apache-kafka-batched-consumer-normal-msg-8-partitions",
bootstrapServers,
msgCount = 1000 * 1000,
msgCount = 1000 * factor,
msgSize = 5 * 1000,
numberOfPartitions = 8)
runPerfTest(cmd,
Expand Down Expand Up @@ -165,14 +167,14 @@ class AlpakkaKafkaPlainProducer extends BenchmarksBase() {

class ApacheKafkaTransactions extends BenchmarksBase() {
it should "bench with small messages" in {
val cmd = RunTestCommand("apache-kafka-transactions", bootstrapServers, 100 * 1000, 100)
val cmd = RunTestCommand("apache-kafka-transactions", bootstrapServers, 100 * factor, 100)
runPerfTest(cmd,
KafkaTransactionFixtures.initialize(cmd),
KafkaTransactionBenchmarks.consumeTransformProduceTransaction(commitInterval = 100.milliseconds))
}

it should "bench with normal messages" in {
val cmd = RunTestCommand("apache-kafka-transactions-normal-msg", bootstrapServers, 100 * 1000, 5000)
val cmd = RunTestCommand("apache-kafka-transactions-normal-msg", bootstrapServers, 100 * factor, 5000)
runPerfTest(cmd,
KafkaTransactionFixtures.initialize(cmd),
KafkaTransactionBenchmarks.consumeTransformProduceTransaction(commitInterval = 100.milliseconds))
Expand All @@ -181,7 +183,7 @@ class ApacheKafkaTransactions extends BenchmarksBase() {

class AlpakkaKafkaTransactions extends BenchmarksBase() {
it should "bench with small messages" in {
val cmd = RunTestCommand("alpakka-kafka-transactions", bootstrapServers, 100 * 1000, 100)
val cmd = RunTestCommand("alpakka-kafka-transactions", bootstrapServers, 100 * factor, 100)
runPerfTest(
cmd,
ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
Expand All @@ -190,7 +192,7 @@ class AlpakkaKafkaTransactions extends BenchmarksBase() {
}

it should "bench with normal messages" in {
val cmd = RunTestCommand("alpakka-kafka-transactions-normal-msg", bootstrapServers, 100 * 1000, 5000)
val cmd = RunTestCommand("alpakka-kafka-transactions-normal-msg", bootstrapServers, 100 * factor, 5000)
runPerfTest(
cmd,
ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package akka.kafka.benchmarks

import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand

class RawKafkaCommitEveryPollConsumer extends BenchmarksBase() {
private val prefix = "apache-kafka-batched-no-pausing-"

it should "bench with small messages" in {
val cmd = RunTestCommand(prefix + "consumer", bootstrapServers, 1000 * factor, 100)
runPerfTest(cmd,
KafkaConsumerFixtures.filledTopics(cmd),
KafkaConsumerBenchmarks.consumerAtLeastOnceCommitEveryPoll())
}

it should "bench with normal messages" in {
val cmd = RunTestCommand(prefix + "consumer-normal-msg", bootstrapServers, 1000 * factor, 5 * 1000)
runPerfTest(cmd,
KafkaConsumerFixtures.filledTopics(cmd),
KafkaConsumerBenchmarks.consumerAtLeastOnceCommitEveryPoll())
}

it should "bench with normal messages and eight partitions" in {
val cmd = RunTestCommand(prefix + "consumer-normal-msg-8-partitions",
bootstrapServers,
msgCount = 1000 * factor,
msgSize = 5 * 1000,
numberOfPartitions = 8)
runPerfTest(cmd,
KafkaConsumerFixtures.filledTopics(cmd),
KafkaConsumerBenchmarks.consumerAtLeastOnceCommitEveryPoll())
}
}

class AlpakkaCommitAndForgetConsumer extends BenchmarksBase() {
val prefix = "alpakka-kafka-commit-and-forget-"

it should "bench with small messages" in {
val cmd = RunTestCommand(prefix + "consumer", bootstrapServers, 1000 * factor, 100)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.committableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerCommitAndForget(commitBatchSize = 1000))
}

it should "bench with normal messages" in {
val cmd = RunTestCommand(prefix + "normal-msg", bootstrapServers, 1000 * factor, 5 * 1000)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.committableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerCommitAndForget(commitBatchSize = 1000))
}

it should "bench with normal messages and eight partitions" in {
val cmd = RunTestCommand(prefix + "normal-msg-8-partitions",
bootstrapServers,
msgCount = 1000 * factor,
msgSize = 5 * 1000,
numberOfPartitions = 8)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.committableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerCommitAndForget(commitBatchSize = 1000))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,17 @@ object KafkaConsumerBenchmarks extends LazyLogging {
def consumerAtLeastOnceBatched(batchSize: Int)(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {
val consumer = fixture.consumer

var lastProcessedOffset = 0L
var lastProcessedOffset = Map.empty[Int, Long]
var accumulatedMsgCount = 0L
var commitInProgress = false
val assignment = consumer.assignment()

def doCommit(): Unit = {
accumulatedMsgCount = 0
val offsetMap = Map(new TopicPartition(fixture.topic, 0) -> new OffsetAndMetadata(lastProcessedOffset))
val offsetMap = lastProcessedOffset.map {
case (partition, offset) =>
new TopicPartition(fixture.topic, partition) -> new OffsetAndMetadata(offset)
}
logger.debug("Committing offset " + offsetMap.head._2.offset())
consumer.commitAsync(
offsetMap.asJava,
Expand All @@ -84,6 +87,7 @@ object KafkaConsumerBenchmarks extends LazyLogging {
commitInProgress = false
}
)
lastProcessedOffset = Map.empty[Int, Long]
}

@tailrec
Expand All @@ -98,7 +102,7 @@ object KafkaConsumerBenchmarks extends LazyLogging {
for (record <- records.iterator().asScala) {
accumulatedMsgCount = accumulatedMsgCount + 1
meter.mark()
lastProcessedOffset = record.offset()
lastProcessedOffset += record.partition() -> record.offset()
if (accumulatedMsgCount >= batchSize) {
if (!commitInProgress) {
commitInProgress = true
Expand All @@ -116,6 +120,50 @@ object KafkaConsumerBenchmarks extends LazyLogging {
fixture.close()
}

/**
* Reads messages from topic in a loop and commits all read offsets.
*/
def consumerAtLeastOnceCommitEveryPoll()(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {
val consumer = fixture.consumer

var lastProcessedOffset = Map.empty[Int, Long]

def doCommit(): Unit = {
val offsetMap = lastProcessedOffset.map {
case (partition, offset) =>
new TopicPartition(fixture.topic, partition) -> new OffsetAndMetadata(offset)
}
logger.debug("Committing offset " + offsetMap.head._2.offset())
consumer.commitAsync(
offsetMap.asJava,
new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit = ()
}
)
lastProcessedOffset = Map.empty[Int, Long]
}

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
logger.debug("Polling")
val records = consumer.poll(pollTimeoutMs)
for (record <- records.iterator().asScala) {
meter.mark()
lastProcessedOffset += record.partition() -> record.offset()
}
doCommit()
val recordCount = records.count()
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}

pollInLoop(readLimit = fixture.msgCount)
fixture.close()
}

/**
* Reads messages from topic in a loop and commits each single message.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {
import PerfFixtureHelpers._

val producerTimeout = 6 minutes
val logPercentStep = 1
val logPercentStep = 25

def randomId() = UUID.randomUUID().toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.kafka.CommitterSettings
import akka.kafka.{CommitDelivery, CommitterSettings}
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Committer
import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.codahale.metrics.Meter
Expand Down Expand Up @@ -87,6 +88,36 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging {
logger.debug("Stream finished")
}

/**
* Reads elements from Kafka source and commits in batches with no backpressure on committing.
*/
def consumerCommitAndForget(
commitBatchSize: Int
)(fixture: CommittableFixture, meter: Meter)(implicit sys: ActorSystem, mat: Materializer): Unit = {
logger.debug("Creating and starting a stream")
val committerDefaults = CommitterSettings(sys)
val promise = Promise[Unit]
val counter = new AtomicInteger(fixture.numberOfPartitions)
val control = fixture.source
.map { msg =>
meter.mark()
if (counter.decrementAndGet() == 0) promise.complete(Success(()))
msg.committableOffset
}
.toMat(
Committer
.sink(committerDefaults.withDelivery(CommitDelivery.SendAndForget).withMaxBatch(commitBatchSize.toLong))
)(
Keep.both
)
.mapMaterializedValue(DrainingControl.apply)
.run()

Await.result(promise.future, streamingTimeout)
control.drainAndShutdown()(sys.dispatcher)
logger.debug("Stream finished")
}

/**
* Reads elements from Kafka source and commits each one immediately after read.
*/
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/mima-filters/1.0.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
# always exclude changes to the internal APIs
ProblemFilters.exclude[Problem]("akka.kafka.internal.*")

# PR #883 committing without backpressure
# https://github.com/akka/alpakka-kafka/pull/883
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.kafka.ConsumerMessage#CommittableOffsetBatch.tellCommit")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.CommitterSettings.this")
6 changes: 6 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ akka.kafka.committer {

# Parallelsim for async committing
parallelism = 100

# API may change.
# Delivery of commits to the internal actor
# WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
# SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
delivery = WaitForAck
}
# // #committer-settings

Expand Down
Loading

0 comments on commit e1e398c

Please sign in to comment.