Skip to content

Commit

Permalink
Update checkpoint call as in apache#2735
Browse files Browse the repository at this point in the history
  • Loading branch information
srowen committed Nov 7, 2014
1 parent 35f23e3 commit 0d0bf29
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@
public final class JavaRecoverableNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

private static JavaStreamingContext createContext(String ip, int port, String outputPath) {
private static JavaStreamingContext createContext(String ip,
int port,
String checkpointDirectory,
String outputPath) {

// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
Expand All @@ -81,6 +84,7 @@ private static JavaStreamingContext createContext(String ip, int port, String ou
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(checkpointDirectory);

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
Expand Down Expand Up @@ -135,12 +139,12 @@ public static void main(String[] args) {

final String ip = args[0];
final int port = Integer.parseInt(args[1]);
String checkpointDirectory = args[2];
final String checkpointDirectory = args[2];
final String outputPath = args[3];
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(ip, port, outputPath);
return createContext(ip, port, checkpointDirectory, outputPath);
}
};
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
Expand Down

0 comments on commit 0d0bf29

Please sign in to comment.