Skip to content

Commit

Permalink
[SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-…
Browse files Browse the repository at this point in the history
…local checkpoint paths and existing SparkContexts

The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following:
1. Use the same code path as Java to check whether a valid checkpoint exists
2. Create a new Python SparkContext only if there no active one.

There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works.

Author: Tathagata Das <[email protected]>

Closes #8366 from tdas/SPARK-10142 and squashes the following commits:

3afa666 [Tathagata Das] Added tests
2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists
9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files
  • Loading branch information
tdas committed Aug 24, 2015
1 parent b963c19 commit 053d94f
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 16 deletions.
22 changes: 13 additions & 9 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,30 @@ def getOrCreate(cls, checkpointPath, setupFunc):
@param checkpointPath: Checkpoint directory used in an earlier streaming program
@param setupFunc: Function to create a new context and setup DStreams
"""
# TODO: support checkpoint in HDFS
if not os.path.exists(checkpointPath) or not os.listdir(checkpointPath):
cls._ensure_initialized()
gw = SparkContext._gateway

# Check whether valid checkpoint information exists in the given path
if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc

cls._ensure_initialized()
gw = SparkContext._gateway

try:
jssc = gw.jvm.JavaStreamingContext(checkpointPath)
except Exception:
print("failed to load StreamingContext from checkpoint", file=sys.stderr)
raise

jsc = jssc.sparkContext()
conf = SparkConf(_jconf=jsc.getConf())
sc = SparkContext(conf=conf, gateway=gw, jsc=jsc)
# If there is already an active instance of Python SparkContext use it, or create a new one
if not SparkContext._active_spark_context:
jsc = jssc.sparkContext()
conf = SparkConf(_jconf=jsc.getConf())
SparkContext(conf=conf, gateway=gw, jsc=jsc)

sc = SparkContext._active_spark_context

# update ctx in serializer
SparkContext._active_spark_context = sc
cls._transformerSerializer.ctx = sc
return StreamingContext(sc, None, jssc)

Expand Down
43 changes: 36 additions & 7 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,10 @@ def tearDownClass():
def tearDown(self):
if self.ssc is not None:
self.ssc.stop(True)
if self.sc is not None:
self.sc.stop()
if self.cpd is not None:
shutil.rmtree(self.cpd)

def test_get_or_create_and_get_active_or_create(self):
inputd = tempfile.mkdtemp()
Expand All @@ -622,8 +626,12 @@ def setup():
self.setupCalled = True
return ssc

cpd = tempfile.mkdtemp("test_streaming_cps")
self.ssc = StreamingContext.getOrCreate(cpd, setup)
# Verify that getOrCreate() calls setup() in absence of checkpoint files
self.cpd = tempfile.mkdtemp("test_streaming_cps")
self.setupCalled = False
self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)

self.ssc.start()

def check_output(n):
Expand Down Expand Up @@ -660,31 +668,52 @@ def check_output(n):
self.ssc.stop(True, True)
time.sleep(1)
self.setupCalled = False
self.ssc = StreamingContext.getOrCreate(cpd, setup)
self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.ssc.start()
check_output(3)

# Verify that getOrCreate() uses existing SparkContext
self.ssc.stop(True, True)
time.sleep(1)
sc = SparkContext(SparkConf())
self.setupCalled = False
self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.assertTrue(self.ssc.sparkContext == sc)

# Verify the getActiveOrCreate() recovers from checkpoint files
self.ssc.stop(True, True)
time.sleep(1)
self.setupCalled = False
self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.ssc.start()
check_output(4)

# Verify that getActiveOrCreate() returns active context
self.setupCalled = False
self.assertEquals(StreamingContext.getActiveOrCreate(cpd, setup), self.ssc)
self.assertEquals(StreamingContext.getActiveOrCreate(self.cpd, setup), self.ssc)
self.assertFalse(self.setupCalled)

# Verify that getActiveOrCreate() uses existing SparkContext
self.ssc.stop(True, True)
time.sleep(1)
self.sc = SparkContext(SparkConf())
self.setupCalled = False
self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.assertTrue(self.ssc.sparkContext == sc)

# Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
self.ssc.stop(True, True)
shutil.rmtree(cpd) # delete checkpoint directory
shutil.rmtree(self.cpd) # delete checkpoint directory
time.sleep(1)
self.setupCalled = False
self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertTrue(self.setupCalled)

# Stop everything
self.ssc.stop(True, True)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ class CheckpointWriter(
private[streaming]
object CheckpointReader extends Logging {

/**
* Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
* files, then return None, else try to return the latest valid checkpoint object. If no
* checkpoint files could be read correctly, then return None.
*/
def read(checkpointDir: String): Option[Checkpoint] = {
read(checkpointDir, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = true)
}

/**
* Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
* files, then return None, else try to return the latest valid checkpoint object. If no
Expand Down

0 comments on commit 053d94f

Please sign in to comment.