Skip to content

Commit

Permalink
[SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recre…
Browse files Browse the repository at this point in the history
…ated from checkpoint and existing SparkContext

This is a revision of the earlier version (see apache#5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are.

* Hard to implement in python.
* New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553

Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context.

So in this PR I have
* Removed the new versions of StreamingContext.getOrCreate() which took SparkContext
* Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext.

Author: Tathagata Das <[email protected]>

Closes apache#6096 from tdas/SPARK-6752 and squashes the following commits:

53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752
f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate
  • Loading branch information
tdas committed May 14, 2015
1 parent 59aaa1d commit bce00da
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class StreamingContext private[streaming] (
if (sc_ != null) {
sc_
} else if (isCheckpointPresent) {
new SparkContext(cp_.createSparkConf())
SparkContext.getOrCreate(cp_.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
Expand Down Expand Up @@ -750,53 +750,6 @@ object StreamingContext extends Logging {
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext
): StreamingContext = {
getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new StreamingContext if there is an
* error in reading checkpoint data. By default, an exception will be
* thrown on error.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext,
createOnError: Boolean
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
checkpointOption.map(new StreamingContext(sparkContext, _, null))
.getOrElse(creatingFunc(sparkContext))
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,51 +804,6 @@ object JavaStreamingContext {
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext,
createOnError: Boolean
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc, createOnError)
new JavaStreamingContext(ssc)
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1766,29 +1766,10 @@ public JavaStreamingContext call() {
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();

// Function to create JavaStreamingContext using existing JavaSparkContext
// without any output operations (used to detect the new context)
Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
new Function<JavaSparkContext, JavaStreamingContext>() {
public JavaStreamingContext call(JavaSparkContext context) {
newContextCreated.set(true);
return new JavaStreamingContext(context, Seconds.apply(1));
}
};

JavaSparkContext sc = new JavaSparkContext(conf);
newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
Assert.assertTrue("new context not created", newContextCreated.get());
ssc.stop(false);

newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
Assert.assertTrue("new context not created", newContextCreated.get());
ssc.stop(false);

newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
JavaSparkContext sc = new JavaSparkContext(conf);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
new org.apache.hadoop.conf.Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.conf.get("someKey") === "someValue")
}
}

test("getOrCreate with existing SparkContext") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
sc = new SparkContext(conf)

// Function to create StreamingContext that has a config to identify it to be new context
var newContextCreated = false
def creatingFunction(sparkContext: SparkContext): StreamingContext = {
newContextCreated = true
new StreamingContext(sparkContext, batchDuration)
}

// Call ssc.stop(stopSparkContext = false) after a body of cody
def testGetOrCreate(body: => Unit): Unit = {
newContextCreated = false
try {
body
} finally {
if (ssc != null) {
ssc.stop(stopSparkContext = false)
}
ssc = null
}
}

val emptyPath = Utils.createTempDir().getAbsolutePath()

// getOrCreate should create new context with empty path
testGetOrCreate {
ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
}

val corrutedCheckpointPath = createCorruptedCheckpoint()

// getOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
}

// getOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getOrCreate(
corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
}

// getOrCreate should create new context with fake checkpoint file and createOnError = true
testGetOrCreate {
ssc = StreamingContext.getOrCreate(
corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
}

val checkpointPath = createValidCheckpoint()

// StreamingContext.getOrCreate should recover context with checkpoint path
// getOrCreate should recover StreamingContext with existing SparkContext
testGetOrCreate {
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
sc = new SparkContext(conf)
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
assert(!ssc.conf.contains("someKey"),
"recovered StreamingContext unexpectedly has old config")
assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
}
}

Expand Down

0 comments on commit bce00da

Please sign in to comment.