Skip to content

Commit

Permalink
[Streaming][Kafka][SPARK-8127] check offset ranges before constructin…
Browse files Browse the repository at this point in the history
…g KafkaRDD
  • Loading branch information
koeninger committed Jun 5, 2015
1 parent c3768c5 commit 8974b9e
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,14 @@ private[spark]
object KafkaCluster {
type Err = ArrayBuffer[Throwable]

/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
result.fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
}

private[spark]
case class LeaderOffset(host: String, port: Int, offset: Long)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,24 @@ class KafkaRDD[
override def isEmpty(): Boolean = count == 0L

override def take(num: Int): Array[R] = {
val nonEmpty = this.partitions
val nonEmptyPartitions = this.partitions
.map(_.asInstanceOf[KafkaRDDPartition])
.filter(_.count > 0)

if (num < 1 || nonEmpty.size < 1) {
if (num < 1 || nonEmptyPartitions.size < 1) {
return new Array[R](0)
}

var remain = num.toLong
// Determine in advance how many messages need to be taken from each partition
val parts = nonEmpty.flatMap { part =>
val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
val remain = num - result.values.sum
if (remain > 0) {
val taken = Math.min(remain, part.count)
remain = remain - taken
Some((part.index -> taken.toInt))
result + (part.index -> taken.toInt)
} else {
None
result
}
}.toMap
}

val buf = new ArrayBuffer[R]
val res = context.runJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,30 @@ object KafkaUtils {

/** get leaders for the given offset ranges, or throw an exception */
private def leadersForRanges(
kafkaParams: Map[String, String],
kc: KafkaCluster,
offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
val kc = new KafkaCluster(kafkaParams)
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
leaders
val leaders = kc.findLeaders(topics)
KafkaCluster.checkErrors(leaders)
}

/** Make sure offsets are available in kafka, or throw an exception */
private def checkOffsets(
kc: KafkaCluster,
offsetRanges: Array[OffsetRange]): Unit = {
val topics = offsetRanges.map(_.topicAndPartition).toSet
val badRanges = KafkaCluster.checkErrors(for {
low <- kc.getEarliestLeaderOffsets(topics).right
high <- kc.getLatestLeaderOffsets(topics).right
} yield {
offsetRanges.filterNot { o =>
low(o.topicAndPartition).offset <= o.fromOffset &&
o.untilOffset <= high(o.topicAndPartition).offset
}
})
if (! badRanges.isEmpty) {
throw new SparkException("Offsets not available on leader: " + badRanges.mkString(","))
}
}

/**
Expand All @@ -191,7 +206,9 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = sc.withScope {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val leaders = leadersForRanges(kafkaParams, offsetRanges)
val kc = new KafkaCluster(kafkaParams)
val leaders = leadersForRanges(kc, offsetRanges)
checkOffsets(kc, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}

Expand Down Expand Up @@ -225,15 +242,17 @@ object KafkaUtils {
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = sc.withScope {
val kc = new KafkaCluster(kafkaParams)
val leaderMap = if (leaders.isEmpty) {
leadersForRanges(kafkaParams, offsetRanges)
leadersForRanges(kc, offsetRanges)
} else {
// This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
leaders.map {
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
}.toMap
}
val cleanedHandler = sc.clean(messageHandler)
checkOffsets(kc, offsetRanges)
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
}

Expand Down Expand Up @@ -399,7 +418,7 @@ object KafkaUtils {
val kc = new KafkaCluster(kafkaParams)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

(for {
KafkaCluster.checkErrors(for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest")) {
kc.getEarliestLeaderOffsets(topicPartitions)
Expand All @@ -412,10 +431,7 @@ object KafkaUtils {
}
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ final class OffsetRange private(
val untilOffset: Long) extends Serializable {
import OffsetRange.OffsetRangeTuple

/** Kafka TopicAndPartition object, for convenience */
def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition)

/** Number of messages this OffsetRange refers to */
def count(): Long = untilOffset - fromOffset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(rdd.take(1).size === 1)
assert(messages(rdd.take(1).head._2))
assert(rdd.take(messages.size + 10).size === messages.size)

// invalid offset ranges throw exceptions
val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
intercept[SparkException] {
KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, badRanges)
}
}

test("iterator boundary conditions") {
Expand Down

0 comments on commit 8974b9e

Please sign in to comment.