Skip to content

Commit

Permalink
slight change
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Sep 11, 2023
1 parent 9031723 commit 7922879
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ class AsyncProgressTrackingMicroBatchExecution(
asyncWritesExecutorService.getQueue.size() > 0 || asyncWritesExecutorService.getActiveCount > 0
}

override protected def validateAndGetTrigger(): TriggerExecutor = {
override protected def getTrigger(): TriggerExecutor = validateAndGetTrigger()

private def validateAndGetTrigger(): TriggerExecutor = {
// validate that the pipeline is using a supported sink
if (!extraOptions
.getOrElse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MicroBatchExecution(

@volatile protected[sql] var triggerExecutor: TriggerExecutor = _

protected def validateAndGetTrigger(): TriggerExecutor = {
protected def getTrigger(): TriggerExecutor = {
assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
trigger match {
case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
Expand Down Expand Up @@ -155,7 +155,7 @@ class MicroBatchExecution(

// Initializing TriggerExecutor relies on `sources`, hence calling this after initializing
// sources.
triggerExecutor = validateAndGetTrigger()
triggerExecutor = getTrigger()

uniqueSources = triggerExecutor match {
case _: SingleBatchExecutor =>
Expand All @@ -175,7 +175,7 @@ class MicroBatchExecution(
sources.distinct.map {
case s: SupportsTriggerAvailableNow => s
case _ =>
throw new IllegalStateException("Should not reach here! Check validateAndGetTrigger().")
throw new IllegalStateException("Should not reach here!")
}.map { s =>
s.prepareForTriggerAvailableNow()
s -> s.getDefaultReadLimit
Expand Down

0 comments on commit 7922879

Please sign in to comment.