From 6af80c8484b4a68e42d9caf1216a3c59bc45c36a Mon Sep 17 00:00:00 2001 From: Egor Krivokon Date: Thu, 4 Oct 2018 18:47:23 +0300 Subject: [PATCH] [MAPR-32290] Spark processing offsets when messages are already ttl in first batch (#368) --- .../kafka09/DirectKafkaInputDStream.scala | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala b/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala index 1df8228cb9755..0c8b6ba244ec7 100644 --- a/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala +++ b/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala @@ -17,18 +17,16 @@ package org.apache.spark.streaming.kafka09 -import java.{ util => ju } +import java.{util => ju} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicReference -import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.{ PartitionInfo, TopicPartition } +import org.apache.kafka.common.TopicPartition -import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{StreamingContext, Time} @@ -74,6 +72,14 @@ private[spark] class DirectKafkaInputDStream[K, V]( kc } + def consumerForAssign(): KafkaConsumer[Long, String] = this.synchronized { + val properties = consumerStrategy.executorKafkaParams + properties.put("max.poll.records", "1") + properties.put(ConsumerConfig.GROUP_ID_CONFIG, + s"${properties.get(ConsumerConfig.GROUP_ID_CONFIG)}_assignGroup") + new KafkaConsumer[Long, String](properties) + } + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { logError("Kafka ConsumerRecord is not serializable. " + "Use .map to extract fields before calling .persist or .window") @@ -240,10 +246,29 @@ private[spark] class DirectKafkaInputDStream[K, V]( override def start(): Unit = { val c = consumer + val consumerAssign = consumerForAssign + val pollTimeout = ssc.sparkContext.getConf + .getLong("spark.streaming.kafka.consumer.driver.poll.ms", 120000) paranoidPoll(c) if (currentOffsets.isEmpty) { currentOffsets = c.assignment().asScala.map { tp => - tp -> c.position(tp) + tp -> { + val position = c.position(tp) + + consumerAssign.assign(ju.Arrays.asList(tp)) + val records = consumerAssign.poll(pollTimeout).iterator() + val firstRecordOffset = if (records.hasNext) { + records.next().offset() + } else { + c.endOffsets(ju.Arrays.asList(tp)).get(tp).longValue() + } + + if (position < firstRecordOffset) { + firstRecordOffset + } else { + position + } + } }.toMap }