Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19182][DStream] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs #16601

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()

@volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil

var rememberDuration: Duration = null
var checkpointInProgress = false

var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
@volatile private var numReceivers: Int = 0

def start(time: Time) {
this.synchronized {
Expand All @@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart)
outputStreams.foreach(_.validateAtStart())
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
inputStreams.par.foreach(_.start())
}
}
Expand Down Expand Up @@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}

def getInputStreamName(streamId: Int): Option[String] = synchronized {
inputStreams.find(_.id == streamId).map(_.name)
}
def getNumReceivers: Int = numReceivers

def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID

def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}

def numInactiveReceivers: Int = {
ssc.graph.getReceiverInputStreams().length - numActiveReceivers
ssc.graph.getNumReceivers - numActiveReceivers
}

def numTotalCompletedBatches: Long = synchronized {
Expand Down Expand Up @@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}

def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
completedBatchUIData.toSeq
completedBatchUIData.toIndexedSeq
}

def streamName(streamId: Int): Option[String] = {
ssc.graph.getInputStreamName(streamId)
ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
}

/**
* Return all InputDStream Ids
*/
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)

/**
* Return all of the record rates for each InputDStream in each batch. The key of the return value
Expand Down