Skip to content

Commit

Permalink
test case that shows why the current implementation is wrong from an …
Browse files Browse the repository at this point in the history
…end-user perspective
  • Loading branch information
koeninger committed Sep 22, 2016
1 parent 881b206 commit 8e86f98
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,38 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext {
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
}

test("users will delete topics") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-seems"
val topic2 = topicPrefix + "-bad"
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, Array("-1"))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.group.id", s"group-$topic")
.option("kafka.auto.offset.reset", s"latest")
.option("kafka.metadata.max.age.ms", "1")
.option("subscribePattern", s"$topicPrefix-.*")

val kafka = reader.load().select("key", "value").as[(Array[Byte], Array[Byte])]
val mapped = kafka.map(kv => new String(kv._2).toInt + 1)

testStream(mapped)(
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
Assert {
testUtils.deleteTopic(topic, 5)
testUtils.createTopic(topic2, partitions = 5)
true
},
AddKafkaData(Set(topic2), 4, 5, 6),
CheckAnswer(2, 3, 4, 5, 6, 7)
)
}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import scala.util.Random

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
import kafka.utils.ZkUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
Expand Down Expand Up @@ -171,6 +172,12 @@ class KafkaTestUtils extends Logging {
createTopic(topic, 1)
}

/** Delete a Kafka topic and wait until it is propagated to the whole cluster */
def deleteTopic(topic: String, partitions: Int): Unit = {
AdminUtils.deleteTopic(zkUtils, topic)
verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
}

/** Add new paritions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
AdminUtils.addPartitions(zkUtils, topic, partitions)
Expand Down Expand Up @@ -234,6 +241,7 @@ class KafkaTestUtils extends Logging {
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
props
}

Expand All @@ -257,6 +265,37 @@ class KafkaTestUtils extends Logging {
props
}

private def verifyTopicDeletion(
zkUtils: ZkUtils,
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]) {
import ZkUtils._
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
def isDeleted(): Boolean = {
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
val topicPath = !zkUtils.pathExists(getTopicPath(topic))
// ensure that the topic-partition has been deleted from all brokers' replica managers
val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None))
// ensure that logs from all replicas are deleted if delete topic is marked successful
val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty))
// ensure that topic is removed from all cleaner offsets
val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
})
deletePath && topicPath && replicaManager && logManager && cleaner
}
eventually(timeout(10.seconds)) {
assert(isDeleted, s"$topic not deleted after timeout")
}
}

private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
Expand Down

0 comments on commit 8e86f98

Please sign in to comment.