Skip to content

Commit

Permalink
[Streaming][Kafka][SPARK-8389] Example of getting offset ranges out o…
Browse files Browse the repository at this point in the history
…f the existing java direct stream api
  • Loading branch information
koeninger committed Jun 16, 2015
1 parent 658814c commit 3f3c57a
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
Expand Down Expand Up @@ -67,8 +68,8 @@ public void tearDown() {

@Test
public void testKafkaStream() throws InterruptedException {
String topic1 = "topic1";
String topic2 = "topic2";
final String topic1 = "topic1";
final String topic2 = "topic2";

String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
Expand All @@ -89,6 +90,16 @@ public void testKafkaStream() throws InterruptedException {
StringDecoder.class,
kafkaParams,
topicToSet(topic1)
).transformToPair(
// Make sure you can get offset ranges from the rdd
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
Assert.assertEquals(offsets[0].topic(), topic1);
return rdd;
}
}
).map(
new Function<Tuple2<String, String>, String>() {
@Override
Expand Down

0 comments on commit 3f3c57a

Please sign in to comment.