Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extra poll after receiving commit data #866

Merged
merged 4 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ import scala.util.control.NonFatal
// prepending, as later received offsets most likely are higher
commitMaps = offsets :: commitMaps
commitSenders = commitSenders :+ sender()
requestDelayedPoll()

case s: SubscriptionRequest =>
subscriptions = subscriptions + s
Expand All @@ -303,15 +304,9 @@ import scala.util.control.NonFatal
checkOverlappingRequests("RequestMessages", sender(), req.topics)
requests = requests.updated(sender(), req)
requestors += sender()
// When many requestors, e.g. many partitions with committablePartitionedSource the
// performance is much improved by collecting more requests/commits before performing the poll.
// That is done by sending a message to self, and thereby collect pending messages in mailbox.
if (requestors.size == 1)
poll()
else if (!delayedPollInFlight) {
delayedPollInFlight = true
self ! delayedPollMsg
}
else requestDelayedPoll()

case Stop =>
commitAggregatedOffsets()
Expand Down Expand Up @@ -411,6 +406,19 @@ import scala.util.control.NonFatal
def schedulePollTask(): Unit =
timers.startSingleTimer(PollTask, pollMsg, settings.pollInterval)

/**
* Sends an extra `Poll(periodic=false)` request to self.
* Enqueueing an extra poll via the actor mailbox allows other requests to be handled
* before the actual poll is executed.
* With many requestors, e.g. many partitions with `committablePartitionedSource` the
* performance is much improved by collecting more requests/commits before performing the poll.
*/
private def requestDelayedPoll(): Unit =
if (!delayedPollInFlight) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the flag should match the method, so extraPollInFlight or requestDelayedPoll().

delayedPollInFlight = true
self ! delayedPollMsg
}

private def receivePoll(p: Poll[_, _]): Unit =
if (p.target == this) {
val refreshOffsets = commitRefreshing.refreshOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,26 @@ class CommittingWithMockSpec(_system: ActorSystem)
Await.result(control.shutdown(), remainingOrDefault)
}

it should "collect commits to be sent in one commitAsync" in assertAllStagesStopped {
it should "collect commits to be sent to commitAsync" in assertAllStagesStopped {
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
.toMat(TestSink.probe)(Keep.both)
.run()

val msgs = (1 to 100).map(createMessage)
val count = 100
val msgs = (1 to count).map(createMessage)
mock.enqueue(msgs.map(toRecord))

probe.request(100)
val done = Future.sequence(probe.expectNextN(100).map(_.committableOffset.commitScaladsl()))
probe.request(count.toLong)
val done = Future.sequence(probe.expectNextN(count.toLong).map(_.committableOffset.commitScaladsl()))

awaitAssert {
commitLog.calls should have size (1)
withClue("the commits are aggregated to a low number of calls to commitAsync:") {
awaitAssert {
val callsToCommitAsync = commitLog.calls.size
callsToCommitAsync should be >= 1
callsToCommitAsync should be < count / 10
}
}

//emulate commit
Expand Down