Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second Batch of Optimizations — IO.apply #91

Merged
merged 2 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ sealed abstract class IO[+A] {
final def to[F[_]](implicit F: cats.effect.Async[F]): F[A @uncheckedVariance] =
this match {
case Pure(a) => F.pure(a)
case Delay(thunk) => F.delay(thunk())
case RaiseError(e) => F.raiseError(e)
case Suspend(thunk) => F.suspend(thunk().to[F])
case Async(k) => F.async(k)
Expand Down Expand Up @@ -331,7 +332,7 @@ object IO extends IOInstances {
* Any exceptions thrown by the effect will be caught and sequenced
* into the `IO`.
*/
def apply[A](body: => A): IO[A] = suspend(Pure(body))
def apply[A](body: => A): IO[A] = Delay(body _)

/**
* Suspends a synchronous side effect which produces an `IO` in `IO`.
Expand Down Expand Up @@ -560,6 +561,8 @@ object IO extends IOInstances {

private[effect] final case class Pure[+A](a: A)
extends IO[A]
private[effect] final case class Delay[+A](thunk: () => A)
extends IO[A]
private[effect] final case class RaiseError(e: Throwable)
extends IO[Nothing]
private[effect] final case class Suspend[+A](thunk: () => IO[A])
Expand Down
267 changes: 160 additions & 107 deletions core/shared/src/main/scala/cats/effect/internals/IORunLoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,139 +17,192 @@
package cats.effect.internals

import cats.effect.IO
import cats.effect.IO.{Async, Bind, Pure, RaiseError, Suspend}
import scala.annotation.tailrec
import cats.effect.IO.{Async, Bind, Delay, Pure, RaiseError, Suspend}
import scala.collection.mutable.ArrayStack

private[effect] object IORunLoop {
private type Current = IO[Any]
private type Bind = Any => IO[Any]
private type CallStack = ArrayStack[Bind]
private type Callback = Either[Throwable, Any] => Unit

/** Evaluates the given `IO` reference, calling the given callback
* with the result when completed.
*/
def start[A](source: IO[A], cb: Either[Throwable, A] => Unit): Unit =
loop(source, cb.asInstanceOf[Callback], null, null, null)

/** Evaluates the given `IO` reference until an asynchronous
* boundary is hit.
*/
def step[A](source: IO[A]): IO[A] =
step(source, null, null).asInstanceOf[IO[A]]

private type Current = IO[Any]
private type Bind = Any => IO[Any]
private type CallStack = ArrayStack[Bind]
private type Callback = Either[Throwable, Any] => Unit

/** Tail-recursive loop that evaluates an `IO` reference.
/** Loop for evaluating an `IO` value.
*
* Note `rcb`, `bFirst` and `bRest` ARE nullable, because
* initialization is avoided until the last possible moment to
* reduce pressure on heap memory.
* The `rcbRef`, `bFirstRef` and `bRestRef` parameters are
* nullable values that can be supplied because the loop needs
* to be resumed in [[RestartCallback]].
*/
@tailrec private def loop(
private def loop(
source: Current,
cb: Callback,
rcb: RestartCallback,
bFirst: Bind,
bRest: CallStack): Unit = {

source match {
case Bind(fa, bindNext) =>
var callStack: CallStack = bRest
if (bFirst ne null) {
if (callStack eq null) callStack = new ArrayStack()
callStack.push(bFirst)
}
// Next iteration please
loop(fa, cb, rcb, bindNext, callStack)
cb: Either[Throwable, Any] => Unit,
rcbRef: RestartCallback,
bFirstRef: Bind,
bRestRef: CallStack): Unit = {

case Pure(value) =>
popNextBind(bFirst, bRest) match {
case null => cb(Right(value))
case bind =>
val fa = try bind(value) catch { case NonFatal(ex) => RaiseError(ex) }
// Next iteration please
loop(fa, cb, rcb, null, bRest)
}
var currentIO: Current = source
var bFirst: Bind = bFirstRef
var bRest: CallStack = bRestRef
var rcb: RestartCallback = rcbRef
// Values from Pure and Delay are unboxed in this var,
// for code reuse between Pure and Delay
var hasUnboxed: Boolean = false
var unboxed: AnyRef = null

case Suspend(thunk) =>
// Next iteration please
val fa = try thunk() catch { case NonFatal(ex) => RaiseError(ex) }
loop(fa, cb, rcb, bFirst, bRest)
do {
currentIO match {
case Bind(fa, bindNext) =>
if (bFirst ne null) {
if (bRest eq null) bRest = new ArrayStack()
bRest.push(bFirst)
}
bFirst = bindNext.asInstanceOf[Bind]
currentIO = fa

case RaiseError(ex) =>
findErrorHandler(bFirst, bRest) match {
case null => cb(Left(ex))
case bind =>
val fa = try bind.recover(ex) catch { case NonFatal(e) => RaiseError(e) }
// Next cycle please
loop(fa, cb, rcb, null, bRest)
}
case Pure(value) =>
unboxed = value.asInstanceOf[AnyRef]
hasUnboxed = true

case Async(register) =>
val restartCallback = if (rcb != null) rcb else RestartCallback(cb)
restartCallback.prepare(bFirst, bRest)
register(restartCallback)
}
}
case Delay(thunk) =>
try {
unboxed = thunk().asInstanceOf[AnyRef]
hasUnboxed = true
currentIO = null
} catch { case NonFatal(e) =>
currentIO = RaiseError(e)
}

/** A [[loop]] variant that evaluates the given `IO` reference
* until the first async boundary, or until the final result,
* whichever comes first.
*
* Note `bFirst` and `bRest` are nullable references, in order
* to avoid initialization until the last possible moment.
*/
@tailrec private def step(
source: Current,
bFirst: Bind,
bRest: CallStack): IO[Any] = {

source match {
case Bind(fa, bindNext) =>
var callStack: CallStack = bRest
if (bFirst ne null) {
if (callStack eq null) callStack = new ArrayStack()
callStack.push(bFirst)
}
// Next iteration please
step(fa, bindNext.asInstanceOf[Bind], callStack)
case Suspend(thunk) =>
currentIO = try thunk() catch { case NonFatal(ex) => RaiseError(ex) }

case RaiseError(ex) =>
findErrorHandler(bFirst, bRest) match {
case null =>
cb(Left(ex))
return
case bind =>
val fa = try bind.recover(ex) catch { case NonFatal(e) => RaiseError(e) }
bFirst = null
currentIO = fa
}

case ref @ Pure(value) =>
case Async(register) =>
if (rcb eq null) rcb = RestartCallback(cb.asInstanceOf[Callback])
rcb.prepare(bFirst, bRest)
register(rcb)
return
}

if (hasUnboxed) {
popNextBind(bFirst, bRest) match {
case null => ref
case null =>
cb(Right(unboxed))
return
case bind =>
val fa = try bind(value) catch { case NonFatal(ex) => RaiseError(ex) }
// Next iteration please
step(fa, null, bRest)
val fa = try bind(unboxed) catch { case NonFatal(ex) => RaiseError(ex) }
hasUnboxed = false
unboxed = null
bFirst = null
currentIO = fa
}
}
} while (true)
}

/** Evaluates the given `IO` reference until an asynchronous
* boundary is hit.
*/
def step[A](source: IO[A]): IO[A] = {
var currentIO: Current = source
var bFirst: Bind = null
var bRest: CallStack = null
// Values from Pure and Delay are unboxed in this var,
// for code reuse between Pure and Delay
var hasUnboxed: Boolean = false
var unboxed: AnyRef = null

do {
currentIO match {
case Bind(fa, bindNext) =>
if (bFirst ne null) {
if (bRest eq null) bRest = new ArrayStack()
bRest.push(bFirst)
}
bFirst = bindNext.asInstanceOf[Bind]
currentIO = fa

case Pure(value) =>
unboxed = value.asInstanceOf[AnyRef]
hasUnboxed = true

case Suspend(thunk) =>
val fa = try thunk() catch { case NonFatal(ex) => RaiseError(ex) }
// Next iteration please
step(fa, bFirst, bRest)
case Delay(thunk) =>
try {
unboxed = thunk().asInstanceOf[AnyRef]
hasUnboxed = true
currentIO = null
} catch { case NonFatal(e) =>
currentIO = RaiseError(e)
}

case Suspend(thunk) =>
currentIO = try thunk() catch { case NonFatal(ex) => RaiseError(ex) }

case RaiseError(ex) =>
findErrorHandler(bFirst, bRest) match {
case null =>
return currentIO.asInstanceOf[IO[A]]
case bind =>
val fa = try bind.recover(ex) catch { case NonFatal(e) => RaiseError(e) }
bFirst = null
currentIO = fa
}

case ref @ RaiseError(ex) =>
findErrorHandler(bFirst, bRest) match {
case null => ref
case Async(register) =>
// Cannot inline the code of this method — as it would
// box those vars in scala.runtime.ObjectRef!
return suspendInAsync(currentIO.asInstanceOf[IO[A]], bFirst, bRest, register)
}

if (hasUnboxed) {
popNextBind(bFirst, bRest) match {
case null =>
return (if (currentIO ne null) currentIO else Pure(unboxed))
.asInstanceOf[IO[A]]
case bind =>
val fa = try bind.recover(ex) catch { case NonFatal(e) => RaiseError(e) }
// Next cycle please
step(fa, null, bRest)
currentIO = try bind(unboxed) catch { case NonFatal(ex) => RaiseError(ex) }
hasUnboxed = false
unboxed = null
bFirst = null
}
}
} while (true)
// $COVERAGE-OFF$
null // Unreachable code
// $COVERAGE-ON$
}

case Async(register) =>
// Hitting an async boundary means we have to stop, however
// if we had previous `flatMap` operations prior to this, then
// we need to resume the loop with the collected stack
if (bFirst != null || (bRest != null && bRest.nonEmpty))
Async[Any] { cb =>
val rcb = RestartCallback(cb)
rcb.prepare(bFirst, bRest)
register(rcb)
}
else
source
}
private def suspendInAsync[A](
currentIO: IO[A],
bFirst: Bind,
bRest: CallStack,
register: (Either[Throwable, Any] => Unit) => Unit): IO[A] = {

// Hitting an async boundary means we have to stop, however
// if we had previous `flatMap` operations then we need to resume
// the loop with the collected stack
if (bFirst != null || (bRest != null && bRest.nonEmpty))
Async { cb =>
val rcb = RestartCallback(cb.asInstanceOf[Callback])
rcb.prepare(bFirst, bRest)
register(rcb)
}
else
currentIO
}

/** Pops the next bind function from the stack, but filters out
Expand Down