Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19182][DStream] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs #16601

Closed
wants to merge 3 commits into from

Conversation

uncleGen
Copy link
Contributor

What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.

How was this patch tested?

existing ut

cc @zsxwing

@SparkQA
Copy link

SparkQA commented Jan 16, 2017

Test build #71441 has finished for PR 16601 at commit 46036bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just save the information that will be used by StreamingJobProgressListener in DStreamGraph.start so that StreamingJobProgressListener can just use them without the lock.

jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
val jobs = getOutputStreams().flatMap { outputStream =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized is to make sure writeObject never write some intermediate states of DStreamGraph.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have put the question to be too simple

@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71492 has finished for PR 16601 at commit eaa7b15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

@zsxwing Take a review please!

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good except some nits.

var rememberDuration: Duration = null
var checkpointInProgress = false

var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
var numReceivers: Int = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add @volatile private

@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()

val inputStreamNameAndID = new ArrayBuffer[(String, Int)]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change it to @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil and just set it in start. Don't expose a mutable ArrayBuffer to the caller.

def getInputStreamName(streamId: Int): Option[String] = synchronized {
inputStreams.find(_.id == streamId).map(_.name)
}
def getReceiverNumber: Int = numReceivers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit getNumReceivers for consistence.

@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}

def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
completedBatchUIData.toSeq
completedBatchUIData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you change toSeq to toIndexedSeq to avoid exposing a mutable collection.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71553 has finished for PR 16601 at commit e51623c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Jan 18, 2017

LGTM. Merging to master. Thanks!

@asfgit asfgit closed this in a81e336 Jan 18, 2017
zzcclp added a commit to zzcclp/spark that referenced this pull request Jan 20, 2017
…Listener to not block UI when generating Streaming jobs apache#16601
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ner to not block UI when generating Streaming jobs

## What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.

## How was this patch tested?
existing ut

cc zsxwing

Author: uncleGen <[email protected]>

Closes apache#16601 from uncleGen/SPARK-19182.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
…ner to not block UI when generating Streaming jobs

## What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.

## How was this patch tested?
existing ut

cc zsxwing

Author: uncleGen <[email protected]>

Closes apache#16601 from uncleGen/SPARK-19182.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants