Skip to content

Commit

Permalink
Synchronous partition assignment handler (internal) (#761)
Browse files Browse the repository at this point in the history
Synchronous partition assignment handler (internal)
  • Loading branch information
2m authored Jul 25, 2019
2 parents 91a6ed9 + 13d015b commit 220022c
Show file tree
Hide file tree
Showing 19 changed files with 447 additions and 90 deletions.
18 changes: 13 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,19 @@ lazy val tests = project
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
"org.mockito" % "mockito-core" % "2.24.5" % Test
) ++ {
if (scalaBinaryVersion.value == "2.13") Seq()
else
Seq(
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % embeddedKafkaSchemaRegistry % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12")
)
scalaBinaryVersion.value match {
case "2.13" =>
Seq()
case "2.12" =>
Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion % Test,
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % embeddedKafkaSchemaRegistry % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12")
)
case "2.11" =>
Seq(
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % embeddedKafkaSchemaRegistry % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12")
)
}
} ++
Seq( // integration test dependencies
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % IntegrationTest,
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ akka.kafka.consumer {
# Used in the transactional flow for exactly-once-semantics processing.
eos-draining-check-interval = 30ms

# Issue warnings when a call to a partition assignment handler method takes
# longer than this.
partition-handler-warning = 5s

# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {
Expand Down
27 changes: 21 additions & 6 deletions core/src/main/scala/akka/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object ConsumerSettings {
val metadataRequestTimeout = config.getDuration("metadata-request-timeout").asScala
val drainingCheckInterval = config.getDuration("eos-draining-check-interval").asScala
val connectionCheckerSettings = ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath))
val partitionHandlerWarning = config.getDuration("partition-handler-warning").asScala

new ConsumerSettings[K, V](
properties,
Expand All @@ -91,7 +92,8 @@ object ConsumerSettings {
metadataRequestTimeout,
drainingCheckInterval,
ConsumerSettings.createKafkaConsumer,
connectionCheckerSettings
connectionCheckerSettings,
partitionHandlerWarning
)
}

Expand Down Expand Up @@ -203,7 +205,8 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
val metadataRequestTimeout: FiniteDuration,
val drainingCheckInterval: FiniteDuration,
val consumerFactory: ConsumerSettings[K, V] => Consumer[K, V],
val connectionCheckerSettings: ConnectionCheckerSettings
val connectionCheckerSettings: ConnectionCheckerSettings,
val partitionHandlerWarning: FiniteDuration
) {

@deprecated("use the factory methods `ConsumerSettings.apply` and `create` instead", "1.0-M1")
Expand Down Expand Up @@ -239,7 +242,8 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
metadataRequestTimeout = 5.seconds,
drainingCheckInterval = 30.millis,
consumerFactory = ConsumerSettings.createKafkaConsumer[K, V],
connectionCheckerSettings = ConnectionCheckerSettings.Disabled
connectionCheckerSettings = ConnectionCheckerSettings.Disabled,
partitionHandlerWarning = 15.seconds
)

/**
Expand Down Expand Up @@ -495,6 +499,14 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
def withDrainingCheckInterval(drainingCheckInterval: java.time.Duration): ConsumerSettings[K, V] =
copy(drainingCheckInterval = drainingCheckInterval.asScala)

/** Scala API */
def withPartitionHandlerWarning(partitionHandlerWarning: FiniteDuration): ConsumerSettings[K, V] =
copy(partitionHandlerWarning = partitionHandlerWarning)

/** Java API */
def withPartitionHandlerWarning(partitionHandlerWarning: java.time.Duration): ConsumerSettings[K, V] =
copy(partitionHandlerWarning = partitionHandlerWarning.asScala)

/**
* Replaces the default Kafka consumer creation logic.
*/
Expand Down Expand Up @@ -530,7 +542,8 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
metadataRequestTimeout: FiniteDuration = metadataRequestTimeout,
drainingCheckInterval: FiniteDuration = drainingCheckInterval,
consumerFactory: ConsumerSettings[K, V] => Consumer[K, V] = consumerFactory,
connectionCheckerConfig: ConnectionCheckerSettings = connectionCheckerSettings
connectionCheckerConfig: ConnectionCheckerSettings = connectionCheckerSettings,
partitionHandlerWarning: FiniteDuration = partitionHandlerWarning
): ConsumerSettings[K, V] =
new ConsumerSettings[K, V](
properties,
Expand All @@ -550,7 +563,8 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
metadataRequestTimeout,
drainingCheckInterval,
consumerFactory,
connectionCheckerConfig
connectionCheckerConfig,
partitionHandlerWarning
)

/**
Expand All @@ -574,6 +588,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
s"waitClosePartition=${waitClosePartition.toCoarsest}," +
s"metadataRequestTimeout=${metadataRequestTimeout.toCoarsest}," +
s"drainingCheckInterval=${drainingCheckInterval.toCoarsest}," +
s"connectionCheckerSettings=$connectionCheckerSettings" +
s"connectionCheckerSettings=$connectionCheckerSettings," +
s"partitionHandlerWarning=${partitionHandlerWarning.toCoarsest}" +
")"
}
56 changes: 56 additions & 0 deletions core/src/main/scala/akka/kafka/RestrictedConsumer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka

import akka.annotation.ApiMayChange
import org.apache.kafka.clients.consumer.{Consumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

/**
* Offers parts of the [[org.apache.kafka.clients.consumer.Consumer]] API which becomes available to
* the [[akka.kafka.scaladsl.PartitionAssignmentHandler]] callbacks.
*/
@ApiMayChange
final class RestrictedConsumer(consumer: Consumer[_, _], duration: java.time.Duration) {

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#assignment]]
*/
def assignment(): java.util.Set[TopicPartition] = consumer.assignment()

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#beginningOffsets()]]
*/
def beginningOffsets(tps: java.util.Collection[TopicPartition]): java.util.Map[TopicPartition, java.lang.Long] =
consumer.beginningOffsets(tps, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#commitSync(Map, java.time.Duration)]]
*/
def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit =
consumer.commitSync(offsets, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#committed(TopicPartition, Duration)]]
*/
def committed(tp: TopicPartition): OffsetAndMetadata = consumer.committed(tp, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#endOffsets(java.util.Collection[TopicPartition], java.time.Duration)]]
*/
def endOffsets(tps: java.util.Collection[TopicPartition]): java.util.Map[TopicPartition, java.lang.Long] =
consumer.endOffsets(tps, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#position(TopicPartition, java.time.Duration)]]
*/
def position(tp: TopicPartition): Long = consumer.position(tp, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, Long)]]
*/
def seek(tp: TopicPartition, offset: Long): Unit = consumer.seek(tp, offset)
}
12 changes: 8 additions & 4 deletions core/src/main/scala/akka/kafka/Subscriptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ object Subscriptions {
private[kafka] final case class TopicSubscription(tps: Set[String], rebalanceListener: Option[ActorRef])
extends AutoSubscription {
def withRebalanceListener(ref: ActorRef): TopicSubscription =
TopicSubscription(tps, Some(ref))
copy(rebalanceListener = Some(ref))

def renderStageAttribute: String = s"${tps.mkString(" ")}$renderListener"
}

Expand All @@ -70,7 +71,8 @@ object Subscriptions {
private[kafka] final case class TopicSubscriptionPattern(pattern: String, rebalanceListener: Option[ActorRef])
extends AutoSubscription {
def withRebalanceListener(ref: ActorRef): TopicSubscriptionPattern =
TopicSubscriptionPattern(pattern, Some(ref))
copy(rebalanceListener = Some(ref))

def renderStageAttribute: String = s"pattern $pattern$renderListener"
}

Expand Down Expand Up @@ -99,7 +101,8 @@ object Subscriptions {
}

/** Creates subscription for given set of topics */
def topics(ts: Set[String]): AutoSubscription = TopicSubscription(ts, None)
def topics(ts: Set[String]): AutoSubscription =
TopicSubscription(ts, rebalanceListener = None)

/**
* JAVA API
Expand All @@ -117,7 +120,8 @@ object Subscriptions {
/**
* Creates subscription for given topics pattern
*/
def topicPattern(pattern: String): AutoSubscription = TopicSubscriptionPattern(pattern, None)
def topicPattern(pattern: String): AutoSubscription =
TopicSubscriptionPattern(pattern, rebalanceListener = None)

/**
* Manually assign given topics and partitions
Expand Down
102 changes: 60 additions & 42 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.util.JavaDurationConverters._
import akka.event.LoggingReceive
import akka.kafka.KafkaConsumerActor.StoppingException
import akka.kafka._
import akka.stream.stage.AsyncCallback
import akka.kafka.scaladsl.PartitionAssignmentHandler
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

Expand All @@ -51,12 +51,12 @@ import scala.util.control.NonFatal
final case class AssignWithOffset(tps: Map[TopicPartition, Long]) extends NoSerializationVerificationNeeded
final case class AssignOffsetsForTimes(timestampsToSearch: Map[TopicPartition, Long])
extends NoSerializationVerificationNeeded
final case class Subscribe(topics: Set[String], listener: ListenerCallbacks)
final case class Subscribe(topics: Set[String], rebalanceHandler: PartitionAssignmentHandler)
extends SubscriptionRequest
with NoSerializationVerificationNeeded
case object RequestMetrics extends NoSerializationVerificationNeeded
// Could be optimized to contain a Pattern as it used during reconciliation now, tho only in exceptional circumstances
final case class SubscribePattern(pattern: String, listener: ListenerCallbacks)
final case class SubscribePattern(pattern: String, rebalanceHandler: PartitionAssignmentHandler)
extends SubscriptionRequest
with NoSerializationVerificationNeeded
final case class Seek(tps: Map[TopicPartition, Long]) extends NoSerializationVerificationNeeded
Expand Down Expand Up @@ -89,36 +89,6 @@ import scala.util.control.NonFatal

}

final case class ListenerCallbacks(onAssign: Set[TopicPartition] => Unit, onRevoke: Set[TopicPartition] => Unit)
extends NoSerializationVerificationNeeded

object ListenerCallbacks {
def apply(subscription: AutoSubscription,
sourceActor: ActorRef,
partitionAssignedCB: AsyncCallback[Set[TopicPartition]],
partitionRevokedCB: AsyncCallback[Set[TopicPartition]],
revokedBlockingCallback: Set[TopicPartition] => Unit = _ => ()): ListenerCallbacks =
KafkaConsumerActor.ListenerCallbacks(
assignedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(subscription, assignedTps), sourceActor)
}
if (assignedTps.nonEmpty) {
partitionAssignedCB.invoke(assignedTps)
}
},
revokedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(subscription, revokedTps), sourceActor)
}
if (revokedTps.nonEmpty) {
partitionRevokedCB.invoke(revokedTps)
}
revokedBlockingCallback(revokedTps)
}
)
}

private[KafkaConsumerActor] trait CommitRefreshing {
def add(offsets: Map[TopicPartition, OffsetAndMetadata]): Unit
def committed(offsets: Map[TopicPartition, OffsetAndMetadata]): Unit
Expand Down Expand Up @@ -231,7 +201,7 @@ import scala.util.control.NonFatal
/** Limits the blocking on offsetForTimes */
private val offsetForTimesTimeout = settings.getOffsetForTimesTimeout

/** Limits the blocking on position in [[WrappedAutoPausedListener]] */
/** Limits the blocking on position in [[RebalanceListenerImpl]] */
private val positionTimeout = settings.getPositionTimeout

private var requests = Map.empty[ActorRef, RequestMessages]
Expand All @@ -247,7 +217,7 @@ import scala.util.control.NonFatal

/**
* While `true`, committing is delayed.
* Changed by `onPartitionsRevoked` and `onPartitionsAssigned` in [[WrappedAutoPausedListener]].
* Changed by `onPartitionsRevoked` and `onPartitionsAssigned` in [[RebalanceListenerImpl]].
*/
private var rebalanceInProgress = false

Expand All @@ -262,6 +232,7 @@ import scala.util.control.NonFatal
private var rebalanceCommitSenders = Vector.empty[ActorRef]

private var delayedPollInFlight = false
private var partitionAssignmentHandler: RebalanceListener = RebalanceListener.Empty

def receive: Receive = LoggingReceive {
case Assign(assignedTps) =>
Expand Down Expand Up @@ -366,10 +337,14 @@ import scala.util.control.NonFatal
def handleSubscription(subscription: SubscriptionRequest): Unit =
try {
subscription match {
case Subscribe(topics, listener) =>
consumer.subscribe(topics.toList.asJava, new WrappedAutoPausedListener(listener))
case SubscribePattern(pattern, listener) =>
consumer.subscribe(Pattern.compile(pattern), new WrappedAutoPausedListener(listener))
case Subscribe(topics, rebalanceHandler) =>
val callback = new RebalanceListenerImpl(rebalanceHandler)
partitionAssignmentHandler = callback
consumer.subscribe(topics.toList.asJava, callback)
case SubscribePattern(pattern, rebalanceHandler) =>
val callback = new RebalanceListenerImpl(rebalanceHandler)
partitionAssignmentHandler = callback
consumer.subscribe(Pattern.compile(pattern), callback)
}

scheduleFirstPollTask()
Expand Down Expand Up @@ -419,6 +394,7 @@ import scala.util.control.NonFatal
case (ref, req) =>
ref ! Messages(req.requestId, Iterator.empty)
}
partitionAssignmentHandler.postStop()
consumer.close(settings.getCloseTimeout)
super.postStop()
}
Expand Down Expand Up @@ -644,24 +620,66 @@ import scala.util.control.NonFatal
* So these methods are always called on the same thread as the actor and we're safe to
* touch internal state.
*/
private final class WrappedAutoPausedListener(listener: ListenerCallbacks)
private[KafkaConsumerActor] sealed trait RebalanceListener
extends ConsumerRebalanceListener
with NoSerializationVerificationNeeded {
override def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]): Unit
override def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit
def postStop(): Unit = ()
}

private[KafkaConsumerActor] object RebalanceListener {
object Empty extends RebalanceListener {
override def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]): Unit = ()

override def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit = ()

override def postStop(): Unit = ()
}
}

private[KafkaConsumerActor] final class RebalanceListenerImpl(
partitionAssignmentHandler: PartitionAssignmentHandler
) extends RebalanceListener {

private val restrictedConsumer = new RestrictedConsumer(consumer, settings.partitionHandlerWarning.*(0.95d).asJava)
private val warningDuration = settings.partitionHandlerWarning.toNanos

override def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]): Unit = {
consumer.pause(partitions)
val tps = partitions.asScala.toSet
commitRefreshing.assignedPositions(tps, consumer, positionTimeout)
listener.onAssign(tps)
val startTime = System.nanoTime()
partitionAssignmentHandler.onAssign(tps, restrictedConsumer)
checkDuration(startTime, "onAssign")
rebalanceInProgress = false
}

override def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit = {
val revokedTps = partitions.asScala.toSet
listener.onRevoke(revokedTps)
val startTime = System.nanoTime()
partitionAssignmentHandler.onRevoke(revokedTps, restrictedConsumer)
checkDuration(startTime, "onRevoke")
commitRefreshing.revoke(revokedTps)
rebalanceInProgress = true
}

override def postStop(): Unit = {
val currentTps = consumer.assignment()
consumer.pause(currentTps)
val startTime = System.nanoTime()
partitionAssignmentHandler.onStop(currentTps.asScala.toSet, restrictedConsumer)
checkDuration(startTime, "onStop")
}

private def checkDuration(startTime: Long, method: String): Unit = {
val duration = System.nanoTime() - startTime
if (duration > warningDuration) {
log.warning("Partition assignment handler `{}` took longer than `partition-handler-warning`: {} ms",
method,
duration / 1000000L)
}
}
}

}
Loading

0 comments on commit 220022c

Please sign in to comment.