Skip to content

Commit

Permalink
[SPARK-4964] pass leaders as a map to ensure 1 leader per TopicPartition
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Feb 10, 2015
1 parent 50820f1 commit e9cece4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,54 @@

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represent the host info for the leader of a Kafka partition.
* Represent the host and port info for a Kafka broker.
* Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID
*/
@Experimental
final class Leader private(
/** Kafka topic name */
val topic: String,
/** Kafka partition id */
val partition: Int,
/** Leader's hostname */
final class Broker private(
/** Broker's hostname */
val host: String,
/** Leader's port */
val port: Int) extends Serializable
/** Broker's port */
val port: Int) extends Serializable {

override def equals(obj: Any): Boolean = obj match {
case that: Broker =>
this.host == that.host &&
this.port == that.port
case _ => false
}

override def hashCode: Int = {
41 * (41 + host.hashCode) + port
}

override def toString(): String = {
s"Broker($host, $port)"
}

}

/**
* :: Experimental ::
* Companion object the provides methods to create instances of [[Leader]].
* Companion object that provides methods to create instances of [[Broker]].
*/
@Experimental
object Leader {
def create(topic: String, partition: Int, host: String, port: Int): Leader =
new Leader(topic, partition, host, port)

def create(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)

def apply(topic: String, partition: Int, host: String, port: Int): Leader =
new Leader(topic, partition, host, port)
object Broker {
def create(host: String, port: Int): Broker =
new Broker(host, port)

def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
def apply(host: String, port: Int): Broker =
new Broker(host, port)

def unapply(broker: Broker): Option[(String, Int)] = {
if (broker == null) {
None
} else {
Some((broker.host, broker.port))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

/** get leaders for the given offset ranges, or throw an exception */
private def leadersForRanges(
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
val kc = new KafkaCluster(kafkaParams)
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
leaders
}

/**
* Create a RDD from Kafka using offset ranges for each topic and partition.
*
Expand All @@ -176,12 +189,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}

Expand All @@ -198,7 +206,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
Expand All @@ -211,12 +220,17 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
leaders: Array[Leader],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = {
val leaderMap = leaders
.map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
.toMap
val leaderMap = if (leaders.isEmpty) {
leadersForRanges(kafkaParams, offsetRanges)
} else {
// This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
leaders.map {
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
}.toMap
}
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}

Expand Down Expand Up @@ -263,7 +277,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
Expand All @@ -276,16 +291,17 @@ object KafkaUtils {
recordClass: Class[R],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange],
leaders: Array[Leader],
leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val leaderMap = Map(leaders.toSeq: _*)
createRDD[K, V, KD, VD, R](
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _)
}

/**
Expand Down

0 comments on commit e9cece4

Please sign in to comment.