Skip to content

Commit

Permalink
Extra poll after receiving commit data (#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Aug 20, 2019
1 parent 3dabc7d commit 685be43
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
22 changes: 15 additions & 7 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ import scala.util.control.NonFatal
// prepending, as later received offsets most likely are higher
commitMaps = offsets :: commitMaps
commitSenders = commitSenders :+ sender()
requestDelayedPoll()

case s: SubscriptionRequest =>
subscriptions = subscriptions + s
Expand All @@ -303,15 +304,9 @@ import scala.util.control.NonFatal
checkOverlappingRequests("RequestMessages", sender(), req.topics)
requests = requests.updated(sender(), req)
requestors += sender()
// When many requestors, e.g. many partitions with committablePartitionedSource the
// performance is much improved by collecting more requests/commits before performing the poll.
// That is done by sending a message to self, and thereby collect pending messages in mailbox.
if (requestors.size == 1)
poll()
else if (!delayedPollInFlight) {
delayedPollInFlight = true
self ! delayedPollMsg
}
else requestDelayedPoll()

case Stop =>
commitAggregatedOffsets()
Expand Down Expand Up @@ -411,6 +406,19 @@ import scala.util.control.NonFatal
def schedulePollTask(): Unit =
timers.startSingleTimer(PollTask, pollMsg, settings.pollInterval)

/**
* Sends an extra `Poll(periodic=false)` request to self.
* Enqueueing an extra poll via the actor mailbox allows other requests to be handled
* before the actual poll is executed.
* With many requestors, e.g. many partitions with `committablePartitionedSource` the
* performance is much improved by collecting more requests/commits before performing the poll.
*/
private def requestDelayedPoll(): Unit =
if (!delayedPollInFlight) {
delayedPollInFlight = true
self ! delayedPollMsg
}

private def receivePoll(p: Poll[_, _]): Unit =
if (p.target == this) {
val refreshOffsets = commitRefreshing.refreshOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,26 @@ class CommittingWithMockSpec(_system: ActorSystem)
Await.result(control.shutdown(), remainingOrDefault)
}

it should "collect commits to be sent in one commitAsync" in assertAllStagesStopped {
it should "collect commits to be sent to commitAsync" in assertAllStagesStopped {
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
.toMat(TestSink.probe)(Keep.both)
.run()

val msgs = (1 to 100).map(createMessage)
val count = 100
val msgs = (1 to count).map(createMessage)
mock.enqueue(msgs.map(toRecord))

probe.request(100)
val done = Future.sequence(probe.expectNextN(100).map(_.committableOffset.commitScaladsl()))
probe.request(count.toLong)
val done = Future.sequence(probe.expectNextN(count.toLong).map(_.committableOffset.commitScaladsl()))

awaitAssert {
commitLog.calls should have size (1)
withClue("the commits are aggregated to a low number of calls to commitAsync:") {
awaitAssert {
val callsToCommitAsync = commitLog.calls.size
callsToCommitAsync should be >= 1
callsToCommitAsync should be < count / 10
}
}

//emulate commit
Expand Down

0 comments on commit 685be43

Please sign in to comment.