From 685be43cdb057e959c5ec0dd29e3224a16cf269a Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Tue, 20 Aug 2019 17:41:39 +0200 Subject: [PATCH] Extra poll after receiving commit data (#866) --- .../kafka/internal/KafkaConsumerActor.scala | 22 +++++++++++++------ .../internal/CommittingWithMockSpec.scala | 17 +++++++++----- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 058fea7d7..72a894aa7 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -286,6 +286,7 @@ import scala.util.control.NonFatal // prepending, as later received offsets most likely are higher commitMaps = offsets :: commitMaps commitSenders = commitSenders :+ sender() + requestDelayedPoll() case s: SubscriptionRequest => subscriptions = subscriptions + s @@ -303,15 +304,9 @@ import scala.util.control.NonFatal checkOverlappingRequests("RequestMessages", sender(), req.topics) requests = requests.updated(sender(), req) requestors += sender() - // When many requestors, e.g. many partitions with committablePartitionedSource the - // performance is much improved by collecting more requests/commits before performing the poll. - // That is done by sending a message to self, and thereby collect pending messages in mailbox. if (requestors.size == 1) poll() - else if (!delayedPollInFlight) { - delayedPollInFlight = true - self ! delayedPollMsg - } + else requestDelayedPoll() case Stop => commitAggregatedOffsets() @@ -411,6 +406,19 @@ import scala.util.control.NonFatal def schedulePollTask(): Unit = timers.startSingleTimer(PollTask, pollMsg, settings.pollInterval) + /** + * Sends an extra `Poll(periodic=false)` request to self. + * Enqueueing an extra poll via the actor mailbox allows other requests to be handled + * before the actual poll is executed. + * With many requestors, e.g. many partitions with `committablePartitionedSource` the + * performance is much improved by collecting more requests/commits before performing the poll. + */ + private def requestDelayedPoll(): Unit = + if (!delayedPollInFlight) { + delayedPollInFlight = true + self ! delayedPollMsg + } + private def receivePoll(p: Poll[_, _]): Unit = if (p.target == this) { val refreshOffsets = commitRefreshing.refreshOffsets diff --git a/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala b/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala index 89a184cf4..40e722df4 100644 --- a/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala @@ -199,21 +199,26 @@ class CommittingWithMockSpec(_system: ActorSystem) Await.result(control.shutdown(), remainingOrDefault) } - it should "collect commits to be sent in one commitAsync" in assertAllStagesStopped { + it should "collect commits to be sent to commitAsync" in assertAllStagesStopped { val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) .toMat(TestSink.probe)(Keep.both) .run() - val msgs = (1 to 100).map(createMessage) + val count = 100 + val msgs = (1 to count).map(createMessage) mock.enqueue(msgs.map(toRecord)) - probe.request(100) - val done = Future.sequence(probe.expectNextN(100).map(_.committableOffset.commitScaladsl())) + probe.request(count.toLong) + val done = Future.sequence(probe.expectNextN(count.toLong).map(_.committableOffset.commitScaladsl())) - awaitAssert { - commitLog.calls should have size (1) + withClue("the commits are aggregated to a low number of calls to commitAsync:") { + awaitAssert { + val callsToCommitAsync = commitLog.calls.size + callsToCommitAsync should be >= 1 + callsToCommitAsync should be < count / 10 + } } //emulate commit