Skip to content

Commit

Permalink
Add preferredLocation param for ReliableKafkaReceiver
Browse files Browse the repository at this point in the history
Add preferredLocation param for ReliableKafkaReceiver
  • Loading branch information
zzcclp committed Mar 11, 2015
1 parent a272a5e commit ddb62d3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.util.Utils
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
* @param preferredLocation specify a preferred location (hostname).
*/
private[streaming]
class KafkaInputDStream[
Expand All @@ -52,14 +53,15 @@ class KafkaInputDStream[
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
storageLevel: StorageLevel,
preferredLocation: Option[String] = None
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel, preferredLocation)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,28 @@ object KafkaUtils {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}

/**
* Create an input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
* @param preferredLocation specify a preferred location (hostname)
*/
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel,
preferredLocation: Option[String]
): ReceiverInputDStream[(K, V)] = {
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel,
preferredLocation)
}

/**
* Create an input stream that pulls messages from Kafka Brokers.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class ReliableKafkaReceiver[
T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel)
storageLevel: StorageLevel,
_preferredLocation: Option[String])
extends Receiver[(K, V)](storageLevel) with Logging {

private val groupId = kafkaParams("group.id")
Expand Down Expand Up @@ -176,6 +177,8 @@ class ReliableKafkaReceiver[
}
}

override def preferredLocation = _preferredLocation

/** Store a Kafka message and the associated metadata as a tuple. */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
Expand Down

0 comments on commit ddb62d3

Please sign in to comment.