Skip to content

Commit

Permalink
Merge pull request #1537 from GeorgiKhomeriki/master
Browse files Browse the repository at this point in the history
recursive scheduling in RxScala
  • Loading branch information
benjchristensen committed Aug 8, 2014
2 parents b12f3eb + cc1ffe6 commit b34bc69
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ trait Worker extends Subscription {
)
}

/**
* Schedules an Action for recursively repeated execution.
*
* @param action the Action to schedule recursively
* @return a subscription to be able to unsubscribe the action
*/
def scheduleRec(action: => Unit): Subscription = {
def work: Unit = {
action
if (!this.isUnsubscribed) {
this.schedule(work)
}
}
this.schedule(work)
}

/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rx.lang.scala

import java.util.concurrent.TimeUnit

import org.junit.Assert.assertTrue
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import rx.lang.scala.schedulers.TestScheduler

class SchedulerTests extends JUnitSuite {

@Test def testScheduleRecSingleRound() {
val scheduler = TestScheduler()
val worker = scheduler.createWorker
var count = 0
worker.scheduleRec({ count += 1; worker.unsubscribe() })
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
assertTrue(count == 1)
}

@Test def testScheduleRecMultipleRounds() {
val scheduler = TestScheduler()
val worker = scheduler.createWorker
var count = 0
worker.scheduleRec({ count += 1; if(count == 100) worker.unsubscribe() })
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
assertTrue(count == 100)
}

@Test def testScheduleRecUnsubscribe() {
val scheduler = TestScheduler()
val worker = scheduler.createWorker
var count = 0
val subscription = worker.scheduleRec({ count += 1 })
subscription.unsubscribe()
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
assertTrue(count == 0)
}

@Test(expected = classOf[Exception])
def testScheduleRecException() {
val scheduler = TestScheduler()
scheduler.createWorker.scheduleRec({ throw new Exception() })
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
}

}

0 comments on commit b34bc69

Please sign in to comment.