Skip to content

Commit

Permalink
Remove rebalance listener for manual subscriptions (#639)
Browse files Browse the repository at this point in the history
While digging around for #539 I noticed that the manual subscription case classes allow setting the actor to receive rebalance notifications. Manual subscriptions do never rebalance, so I deprecated those methods and made them no-ops for now.
  • Loading branch information
ennru authored and 2m committed Dec 7, 2018
1 parent be6a8b3 commit 3bd8b9a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 38 deletions.
18 changes: 18 additions & 0 deletions core/src/main/mima-filters/1.0-M1.backwards.excludes
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,21 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.CommitterSettings
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Metadata#CommittedOffset.*")
ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.Metadata$CommittedOffset$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Metadata#CommittedOffset.apply")

# PR #639 Remove rebalance listener for manual subscriptions
# https://github.com/akka/alpakka-kafka/pull/639
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#Assignment.copy$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#Assignment.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#Assignment.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#Assignment.apply")
ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.Subscriptions$Assignment$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentWithOffset.copy$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentWithOffset.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentWithOffset.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentWithOffset.apply")
ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.Subscriptions$AssignmentWithOffset$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentOffsetsForTimes.copy$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentOffsetsForTimes.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentOffsetsForTimes.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.Subscriptions#AssignmentOffsetsForTimes.apply")
ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.Subscriptions$AssignmentOffsetsForTimes$")
59 changes: 33 additions & 26 deletions core/src/main/scala/akka/kafka/Subscriptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,31 @@ sealed trait Subscription {

def renderStageAttribute: String

protected def renderListener: String =
rebalanceListener match {
case Some(ref) => s" rebalanceListener $ref"
case None => ""
}
protected def renderListener: String = ""
}
sealed trait ManualSubscription extends Subscription {

/** @deprecated Manual subscriptions do never rebalance */
@deprecated("Manual subscription does never rebalance", "1.0-M2")
def rebalanceListener: Option[ActorRef] = None

/** @deprecated Manual subscriptions do never rebalance */
@deprecated("Manual subscription does never rebalance", "1.0-M2")
def withRebalanceListener(ref: ActorRef): ManualSubscription
}
sealed trait AutoSubscription extends Subscription {

/** ActorRef which is to receive [[akka.kafka.ConsumerRebalanceEvent]] signals when rebalancing happens */
def rebalanceListener: Option[ActorRef]

/** Configure this actor ref to receive [[akka.kafka.ConsumerRebalanceEvent]] signals */
def withRebalanceListener(ref: ActorRef): AutoSubscription

override protected def renderListener: String =
rebalanceListener match {
case Some(ref) => s" rebalanceListener $ref"
case None => ""
}
}

sealed trait ConsumerRebalanceEvent
Expand Down Expand Up @@ -62,33 +76,26 @@ object Subscriptions {

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class Assignment(tps: Set[TopicPartition], rebalanceListener: Option[ActorRef])
extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): Assignment =
Assignment(tps, Some(ref))
def renderStageAttribute: String = s"${tps.mkString(" ")}$renderListener"
private[kafka] final case class Assignment(tps: Set[TopicPartition]) extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): Assignment = this
def renderStageAttribute: String = s"${tps.mkString(" ")}"
}

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class AssignmentWithOffset(tps: Map[TopicPartition, Long],
rebalanceListener: Option[ActorRef])
extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): AssignmentWithOffset =
AssignmentWithOffset(tps, Some(ref))
private[kafka] final case class AssignmentWithOffset(tps: Map[TopicPartition, Long]) extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): AssignmentWithOffset = this
def renderStageAttribute: String =
s"${tps.map { case (tp, offset) => s"$tp offset$offset" }.mkString(" ")}$renderListener"
s"${tps.map { case (tp, offset) => s"$tp offset$offset" }.mkString(" ")}"
}

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class AssignmentOffsetsForTimes(timestampsToSearch: Map[TopicPartition, Long],
rebalanceListener: Option[ActorRef])
private[kafka] final case class AssignmentOffsetsForTimes(timestampsToSearch: Map[TopicPartition, Long])
extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): AssignmentOffsetsForTimes =
AssignmentOffsetsForTimes(timestampsToSearch, Some(ref))
def withRebalanceListener(ref: ActorRef): AssignmentOffsetsForTimes = this
def renderStageAttribute: String =
s"${timestampsToSearch.map { case (tp, timestamp) => s"$tp timestamp$timestamp" }.mkString(" ")}$renderListener"
s"${timestampsToSearch.map { case (tp, timestamp) => s"$tp timestamp$timestamp" }.mkString(" ")}"
}

/** Creates subscription for given set of topics */
Expand All @@ -115,7 +122,7 @@ object Subscriptions {
/**
* Manually assign given topics and partitions
*/
def assignment(tps: Set[TopicPartition]): ManualSubscription = Assignment(tps, None)
def assignment(tps: Set[TopicPartition]): ManualSubscription = Assignment(tps)

/**
* JAVA API
Expand All @@ -133,12 +140,12 @@ object Subscriptions {
/**
* Manually assign given topics and partitions with offsets
*/
def assignmentWithOffset(tps: Map[TopicPartition, Long]): ManualSubscription = AssignmentWithOffset(tps, None)
def assignmentWithOffset(tps: Map[TopicPartition, Long]): ManualSubscription = AssignmentWithOffset(tps)

/**
* Manually assign given topics and partitions with offsets
*/
def assignmentWithOffset(tps: (TopicPartition, Long)*): ManualSubscription = AssignmentWithOffset(tps.toMap, None)
def assignmentWithOffset(tps: (TopicPartition, Long)*): ManualSubscription = AssignmentWithOffset(tps.toMap)

/**
* JAVA API
Expand All @@ -157,13 +164,13 @@ object Subscriptions {
* Manually assign given topics and partitions with timestamps
*/
def assignmentOffsetsForTimes(tps: Map[TopicPartition, Long]): ManualSubscription =
AssignmentOffsetsForTimes(tps, None)
AssignmentOffsetsForTimes(tps)

/**
* Manually assign given topics and partitions with timestamps
*/
def assignmentOffsetsForTimes(tps: (TopicPartition, Long)*): ManualSubscription =
AssignmentOffsetsForTimes(tps.toMap, None)
AssignmentOffsetsForTimes(tps.toMap)

/**
* JAVA API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ import scala.concurrent.{ExecutionContext, Future}
protected def configureSubscription(): Unit

protected def configureManualSubscription(subscription: ManualSubscription): Unit = subscription match {
case Assignment(topics, _) =>
case Assignment(topics) =>
consumerActor.tell(KafkaConsumerActor.Internal.Assign(topics), sourceActor.ref)
tps ++= topics
case AssignmentWithOffset(topics, _) =>
case AssignmentWithOffset(topics) =>
consumerActor.tell(KafkaConsumerActor.Internal.AssignWithOffset(topics), sourceActor.ref)
tps ++= topics.keySet
case AssignmentOffsetsForTimes(topics, _) =>
case AssignmentOffsetsForTimes(topics) =>
consumerActor.tell(KafkaConsumerActor.Internal.AssignOffsetsForTimes(topics), sourceActor.ref)
tps ++= topics.keySet
}
Expand Down
19 changes: 10 additions & 9 deletions core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import scala.concurrent.{Future, Promise}

final def configureSubscription(): Unit = {

def rebalanceListener: KafkaConsumerActor.ListenerCallbacks = {
def rebalanceListener(autoSubscription: AutoSubscription): KafkaConsumerActor.ListenerCallbacks = {
val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assignedTps =>
tps ++= assignedTps
log.log(partitionLogLevel, "Assigned partitions: {}. All partitions: {}", assignedTps, tps)
Expand All @@ -50,16 +50,16 @@ import scala.concurrent.{Future, Promise}

KafkaConsumerActor.ListenerCallbacks(
assignedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(subscription, assignedTps), sourceActor.ref)
autoSubscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(autoSubscription, assignedTps), sourceActor.ref)
}
if (assignedTps.nonEmpty) {
partitionAssignedCB.invoke(assignedTps)
}
},
revokedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(subscription, revokedTps), sourceActor.ref)
autoSubscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(autoSubscription, revokedTps), sourceActor.ref)
}
if (revokedTps.nonEmpty) {
partitionRevokedCB.invoke(revokedTps)
Expand All @@ -69,10 +69,11 @@ import scala.concurrent.{Future, Promise}
}

subscription match {
case TopicSubscription(topics, _) =>
consumerActor.tell(KafkaConsumerActor.Internal.Subscribe(topics, rebalanceListener), sourceActor.ref)
case TopicSubscriptionPattern(topics, _) =>
consumerActor.tell(KafkaConsumerActor.Internal.SubscribePattern(topics, rebalanceListener), sourceActor.ref)
case sub @ TopicSubscription(topics, _) =>
consumerActor.tell(KafkaConsumerActor.Internal.Subscribe(topics, rebalanceListener(sub)), sourceActor.ref)
case sub @ TopicSubscriptionPattern(topics, _) =>
consumerActor.tell(KafkaConsumerActor.Internal.SubscribePattern(topics, rebalanceListener(sub)),
sourceActor.ref)
case s: ManualSubscription => configureManualSubscription(s)
}

Expand Down

0 comments on commit 3bd8b9a

Please sign in to comment.