From 0d0bf29ce595c078de8f4a5ec60d435e6132ad81 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 7 Nov 2014 23:34:24 +0000 Subject: [PATCH] Update checkpoint call as in https://github.com/apache/spark/pull/2735 --- .../streaming/JavaRecoverableNetworkWordCount.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index 04d56c70a2779..bceda97f058ea 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -69,7 +69,10 @@ public final class JavaRecoverableNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); - private static JavaStreamingContext createContext(String ip, int port, String outputPath) { + private static JavaStreamingContext createContext(String ip, + int port, + String checkpointDirectory, + String outputPath) { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint @@ -81,6 +84,7 @@ private static JavaStreamingContext createContext(String ip, int port, String ou SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount"); // Create the context with a 1 second batch size JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + ssc.checkpoint(checkpointDirectory); // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -135,12 +139,12 @@ public static void main(String[] args) { final String ip = args[0]; final int port = Integer.parseInt(args[1]); - String checkpointDirectory = args[2]; + final String checkpointDirectory = args[2]; final String outputPath = args[3]; JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { - return createContext(ip, port, outputPath); + return createContext(ip, port, checkpointDirectory, outputPath); } }; JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);