Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate offsets and commit before poll #862

Merged
merged 4 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ akka.kafka.committer {
max-interval = 10s

# Parallelsim for async committing
parallelism = 1
parallelism = 100
}
# // #committer-settings

Expand Down
66 changes: 31 additions & 35 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import scala.jdk.CollectionConverters._
import scala.collection.compat._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -179,6 +180,17 @@ import scala.util.control.NonFatal
}

private val oneMilli = java.time.Duration.ofMillis(1)

/**
* Create map with just the highest received offsets.
*/
private[internal] def aggregateOffsets(cm: List[Map[TopicPartition, OffsetAndMetadata]]) =
cm.foldLeft(Map.empty[TopicPartition, OffsetAndMetadata]) { (aggregate, add) =>
val higherThanExisting = add.filterNot {
case (tp, toBeAdded) => aggregate.get(tp).exists(_.offset() > toBeAdded.offset())
}
aggregate ++ higherThanExisting
}
}

/**
Expand All @@ -205,6 +217,8 @@ import scala.util.control.NonFatal
private val positionTimeout = settings.getPositionTimeout

private var requests = Map.empty[ActorRef, RequestMessages]

/** ActorRefs to all stages that requested messages from this actor (removed on their termination). */
private var requestors = Set.empty[ActorRef]
private var consumer: Consumer[K, V] = _
private var subscriptions = Set.empty[SubscriptionRequest]
Expand All @@ -222,14 +236,14 @@ import scala.util.control.NonFatal
private var rebalanceInProgress = false

/**
* Keeps commit offsets during rebalances for later commit.
* Collect commit offset maps until the next poll.
*/
private var rebalanceCommitStash = Map.empty[TopicPartition, OffsetAndMetadata]
private var commitMaps = List.empty[Map[TopicPartition, OffsetAndMetadata]]

/**
* Keeps commit senders that need a reply once stashed commits are made.
*/
private var rebalanceCommitSenders = Vector.empty[ActorRef]
private var commitSenders = Vector.empty[ActorRef]

private var delayedPollInFlight = false
private var partitionAssignmentHandler: RebalanceListener = RebalanceListener.Empty
Expand Down Expand Up @@ -270,14 +284,10 @@ import scala.util.control.NonFatal
}
commitRefreshing.assignedPositions(assignedOffsets.keySet, assignedOffsets)

case Commit(offsets) if rebalanceInProgress =>
rebalanceCommitStash ++= offsets
rebalanceCommitSenders = rebalanceCommitSenders :+ sender()

case Commit(offsets) =>
commitRefreshing.add(offsets)
val replyTo = sender()
commit(offsets, replyTo ! _)
// prepending, as later received offsets most likely are higher
commitMaps = offsets :: commitMaps
commitSenders = commitSenders :+ sender()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thing to consider would be to request an extra poll by setting up a "delayed poll in flight" as the RequestMessages do.


case s: SubscriptionRequest =>
subscriptions = subscriptions + s
Expand All @@ -296,7 +306,7 @@ import scala.util.control.NonFatal
requests = requests.updated(sender(), req)
requestors += sender()
// When many requestors, e.g. many partitions with committablePartitionedSource the
// performance is much by collecting more requests/commits before performing the poll.
// performance is much improved by collecting more requests/commits before performing the poll.
// That is done by sending a message to self, and thereby collect pending messages in mailbox.
if (requestors.size == 1)
poll()
Expand All @@ -309,6 +319,7 @@ import scala.util.control.NonFatal
commitRefreshing.committed(offsets)

case Stop =>
commitAggregatedOffsets()
if (commitsInProgress == 0) {
log.debug("Received Stop from {}, stopping", sender())
context.stop(self)
Expand Down Expand Up @@ -410,7 +421,7 @@ import scala.util.control.NonFatal
val refreshOffsets = commitRefreshing.refreshOffsets
if (refreshOffsets.nonEmpty) {
log.debug("Refreshing committed offsets: {}", refreshOffsets)
commit(refreshOffsets, context.system.deadLetters ! _)
commit(refreshOffsets, _ => ())
}
poll()
if (p.periodic)
Expand All @@ -424,8 +435,8 @@ import scala.util.control.NonFatal

def poll(): Unit = {
val currentAssignmentsJava = consumer.assignment()
val initialRebalanceInProgress = rebalanceInProgress
try {
commitAggregatedOffsets()
if (requests.isEmpty) {
// no outstanding requests so we don't expect any messages back, but we should anyway
// drive the KafkaConsumer by polling
Expand Down Expand Up @@ -463,14 +474,20 @@ import scala.util.control.NonFatal
log.error(e, "Exception when polling from consumer, stopping actor: {}", e.toString)
context.stop(self)
}
checkRebalanceState(initialRebalanceInProgress)
ennru marked this conversation as resolved.
Show resolved Hide resolved

if (stopInProgress && commitsInProgress == 0) {
log.debug("Stopping")
context.stop(self)
}
}

private def commitAggregatedOffsets(): Unit = if (commitMaps.nonEmpty && !rebalanceInProgress) {
val replyTo = commitSenders
commit(aggregateOffsets(commitMaps), msg => replyTo.foreach(_ ! msg))
commitMaps = List.empty
commitSenders = Vector.empty
}

private def commit(commitMap: Map[TopicPartition, OffsetAndMetadata], sendReply: AnyRef => Unit): Unit = {
commitRefreshing.updateRefreshDeadlines(commitMap.keySet)
commitsInProgress += 1
Expand All @@ -494,15 +511,6 @@ import scala.util.control.NonFatal
}
}
)
// When many requestors, e.g. many partitions with committablePartitionedSource the
// performance is much by collecting more requests/commits before performing the poll.
// That is done by sending a message to self, and thereby collect pending messages in mailbox.
if (requestors.size == 1)
poll()
else if (!delayedPollInFlight) {
delayedPollInFlight = true
self ! delayedPollMsg
}
}

private def processResult(partitionsToFetch: Set[TopicPartition], rawResult: ConsumerRecords[K, V]): Unit =
Expand Down Expand Up @@ -595,18 +603,6 @@ import scala.util.control.NonFatal
)
}

/**
* Detects state changes of [[rebalanceInProgress]] and takes action on it.
*/
private def checkRebalanceState(initialRebalanceInProgress: Boolean): Unit =
if (initialRebalanceInProgress && !rebalanceInProgress && rebalanceCommitSenders.nonEmpty) {
log.debug("committing stash {} replying to {}", rebalanceCommitStash, rebalanceCommitSenders)
val replyTo = rebalanceCommitSenders
commit(rebalanceCommitStash, msg => replyTo.foreach(_ ! msg))
rebalanceCommitStash = Map.empty
rebalanceCommitSenders = Vector.empty
}

/**
* Copied from the implemented interface: "
* These methods will be called after the partition re-assignment completes and before the
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/scaladsl/Committer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Committer {
Flow[Committable]
.groupedWeightedWithin(settings.maxBatch, settings.maxInterval)(_.batchSize)
.map(CommittableOffsetBatch.apply)
.mapAsync(settings.parallelism) { b =>
.mapAsyncUnordered(settings.parallelism) { b =>
b.commitScaladsl().map(_ => b)(ExecutionContexts.sameThreadExecutionContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
Await.result(control.shutdown(), remainingOrDefault)
}

it should "call commitAsync for every commit message (no commit batching)" in assertAllStagesStopped {
it should "collect commits to be sent in one commitAsync" in assertAllStagesStopped {
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
Expand All @@ -213,7 +213,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val done = Future.sequence(probe.expectNextN(100).map(_.committableOffset.commitScaladsl()))

awaitAssert {
commitLog.calls should have size (100)
commitLog.calls should have size (1)
}

//emulate commit
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.scalatest.{Matchers, WordSpecLike}

class OffsetAggregationSpec extends WordSpecLike with Matchers {

val topicA = "topicA"
val topicB = "topicB"

"aggregateOffsets" should {
"give all offsets for one element" in {
val in = Map(new TopicPartition(topicA, 1) -> new OffsetAndMetadata(12, OffsetFetchResponse.NO_METADATA))
KafkaConsumerActor.aggregateOffsets(List(in)) shouldBe in
}

"give the highest offsets" in {
val in1 = Map(new TopicPartition(topicA, 1) -> new OffsetAndMetadata(42, OffsetFetchResponse.NO_METADATA))
val in2 = Map(new TopicPartition(topicA, 1) -> new OffsetAndMetadata(12, OffsetFetchResponse.NO_METADATA))
KafkaConsumerActor.aggregateOffsets(List(in1, in2)) shouldBe in1
}

"give the highest offsets (other order)" in {
val in1 = Map(new TopicPartition(topicA, 1) -> new OffsetAndMetadata(42, OffsetFetchResponse.NO_METADATA))
val in2 = Map(new TopicPartition(topicA, 1) -> new OffsetAndMetadata(12, OffsetFetchResponse.NO_METADATA))
KafkaConsumerActor.aggregateOffsets(List(in2, in1)) shouldBe in1
}

"give the highest offsets (when mixed)" in {
val in1 = Map(
new TopicPartition(topicA, 1) -> new OffsetAndMetadata(42, OffsetFetchResponse.NO_METADATA),
new TopicPartition(topicB, 1) -> new OffsetAndMetadata(11, OffsetFetchResponse.NO_METADATA)
)
val in2 = Map(
new TopicPartition(topicA, 1) -> new OffsetAndMetadata(12, OffsetFetchResponse.NO_METADATA),
new TopicPartition(topicB, 1) -> new OffsetAndMetadata(43, OffsetFetchResponse.NO_METADATA)
)
KafkaConsumerActor.aggregateOffsets(List(in1, in2)) shouldBe Map(
new TopicPartition(topicA, 1) -> new OffsetAndMetadata(42, OffsetFetchResponse.NO_METADATA),
new TopicPartition(topicB, 1) -> new OffsetAndMetadata(43, OffsetFetchResponse.NO_METADATA)
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package akka.kafka.scaladsl

import java.util.concurrent.atomic.AtomicInteger
import java.util.function.IntUnaryOperator

import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.ProducerMessage.MultiMessage
Expand Down Expand Up @@ -49,7 +50,9 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside {
.committableSource(consumerSettings, Subscriptions.topics(topic1))
.mapAsync(10) { elem =>
elem.committableOffset.commitScaladsl().map { _ =>
committedElements.set(elem.record.value.toInt)
committedElements.updateAndGet(new IntUnaryOperator {
override def applyAsInt(operand: Int): Int = Math.max(operand, elem.record.value.toInt)
})
elem.record.value
}
}
Expand Down