-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19182][DStream] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs #16601
Conversation
…n generating Streaming jobs
Test build #71441 has finished for PR 16601 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just save the information that will be used by StreamingJobProgressListener in DStreamGraph.start so that StreamingJobProgressListener can just use them without the lock.
jobOption.foreach(_.setCallSite(outputStream.creationSite)) | ||
jobOption | ||
} | ||
val jobs = getOutputStreams().flatMap { outputStream => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized
is to make sure writeObject
never write some intermediate states of DStreamGraph
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have put the question to be too simple
Test build #71492 has finished for PR 16601 at commit
|
@zsxwing Take a review please! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good except some nits.
var rememberDuration: Duration = null | ||
var checkpointInProgress = false | ||
|
||
var zeroTime: Time = null | ||
var startTime: Time = null | ||
var batchDuration: Duration = null | ||
var numReceivers: Int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add @volatile private
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { | |||
private val inputStreams = new ArrayBuffer[InputDStream[_]]() | |||
private val outputStreams = new ArrayBuffer[DStream[_]]() | |||
|
|||
val inputStreamNameAndID = new ArrayBuffer[(String, Int)]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: change it to @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
and just set it in start
. Don't expose a mutable ArrayBuffer to the caller.
def getInputStreamName(streamId: Int): Option[String] = synchronized { | ||
inputStreams.find(_.id == streamId).map(_.name) | ||
} | ||
def getReceiverNumber: Int = numReceivers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit getNumReceivers
for consistence.
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) | |||
} | |||
|
|||
def retainedCompletedBatches: Seq[BatchUIData] = synchronized { | |||
completedBatchUIData.toSeq | |||
completedBatchUIData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you change toSeq
to toIndexedSeq
to avoid exposing a mutable collection.
Test build #71553 has finished for PR 16601 at commit
|
LGTM. Merging to master. Thanks! |
…Listener to not block UI when generating Streaming jobs apache#16601
…ner to not block UI when generating Streaming jobs ## What changes were proposed in this pull request? When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata). It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation. ## How was this patch tested? existing ut cc zsxwing Author: uncleGen <[email protected]> Closes apache#16601 from uncleGen/SPARK-19182.
…ner to not block UI when generating Streaming jobs ## What changes were proposed in this pull request? When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata). It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation. ## How was this patch tested? existing ut cc zsxwing Author: uncleGen <[email protected]> Closes apache#16601 from uncleGen/SPARK-19182.
What changes were proposed in this pull request?
When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.
How was this patch tested?
existing ut
cc @zsxwing