Skip to content

Commit

Permalink
Merge pull request #232 from oleg-py/all-the-things-you-can-shift
Browse files Browse the repository at this point in the history
Require Timer and shift automatically in concurrent IO operations
  • Loading branch information
alexandru authored May 23, 2018
2 parents 485fb4a + 010d6d7 commit 8ed6e71
Show file tree
Hide file tree
Showing 22 changed files with 189 additions and 203 deletions.
16 changes: 16 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,22 @@ val mimaSettings = Seq(
exclude[ReversedMissingMethodProblem]("cats.effect.Effect#EitherTEffect.toIO"),
exclude[ReversedMissingMethodProblem]("cats.effect.Effect.toIO"),
exclude[ReversedMissingMethodProblem]("cats.effect.ConcurrentEffect.toIO"),


// Require Timer[IO] for auto-shifting now
exclude[DirectMissingMethodProblem]("cats.effect.IO.start"),
exclude[DirectMissingMethodProblem]("cats.effect.IO.race"),
exclude[DirectMissingMethodProblem]("cats.effect.IO.racePair"),
exclude[DirectMissingMethodProblem]("cats.effect.IOParallelNewtype.ioEffect"),
exclude[DirectMissingMethodProblem]("cats.effect.IOInstances.parApplicative"),
exclude[DirectMissingMethodProblem]("cats.effect.IOInstances.ioParallel"),
exclude[DirectMissingMethodProblem]("cats.effect.IOInstances.ioConcurrentEffect"),
exclude[DirectMissingMethodProblem]("cats.effect.internals.IOParMap.apply"),
exclude[DirectMissingMethodProblem]("cats.effect.internals.IOCompanionBinaryCompat.ioEffect"),
exclude[DirectMissingMethodProblem]("cats.effect.internals.IORace.simple"),
exclude[DirectMissingMethodProblem]("cats.effect.internals.IORace.pair"),
exclude[DirectMissingMethodProblem]("cats.effect.internals.IOStart.apply"),

//
// Following are all internal implementation details:
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[effect] object IOAppPlatform {
IO.raiseError(new AssertionError("IOApp keep alive failed unexpectedly."))
case Right(exitCode) =>
IO.pure(exitCode.code)
}.start
}.start(timer.value)
}

val defaultTimer: Timer[IO] = IOTimer.global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ private[effect] object IOAppPlatform {
}

def mainFiber(args: Array[String], timer: Eval[Timer[IO]])(run: List[String] => IO[ExitCode]): IO[Fiber[IO, Int]] = {
val _ = timer // unused in this platform

object Canceled extends RuntimeException
for {
latch <- IO(new CountDownLatch(1))
Expand All @@ -49,7 +47,7 @@ private[effect] object IOAppPlatform {
}
.productL(IO(latch.countDown()))
.map(_.code)
.start
.start(timer.value)
_ <- IO(sys.addShutdownHook {
fiber.cancel.unsafeRunSync()
latch.await()
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/Concurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ import scala.util.Either
@typeclass
@implicitNotFound("""Cannot find implicit value for Concurrent[${F}].
Building this implicit value might depend on having an implicit
s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""")
s.c.ExecutionContext in scope, a Timer, Scheduler or some equivalent type.""")
trait Concurrent[F[_]] extends Async[F] {
/**
* Creates a cancelable `F[A]` instance that executes an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.util.Either
@typeclass
@implicitNotFound("""Cannot find implicit value for ConcurrentEffect[${F}].
Building this implicit value might depend on having an implicit
s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""")
s.c.ExecutionContext in scope, a Timer, Scheduler or some equivalent type.""")
trait ConcurrentEffect[F[_]] extends Concurrent[F] with Effect[F] {
/**
* Evaluates `F[_]` with the ability to cancel it.
Expand Down
160 changes: 83 additions & 77 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,9 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] {
* consider in the example above what would happen if the first task
* finishes in error. In that case the second task doesn't get canceled,
* which creates a potential memory leak.
*
* IMPORTANT — this operation does not start with an asynchronous boundary.
* But you can use [[IO.shift(implicit* IO.shift]] to force an async
* boundary just before start.
*/
final def start: IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(this)
final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(timer, this)

/**
* Returns a new `IO` that mirrors the source task for normal termination,
Expand Down Expand Up @@ -675,104 +671,114 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype
}

implicit def ioSemigroup[A: Semigroup]: Semigroup[IO[A]] = new IOSemigroup[A]

implicit val ioEffect: Effect[IO] = new IOEffect

private[effect] class IOEffect extends Effect[IO] with StackSafeMonad[IO] {
final override def pure[A](a: A): IO[A] =
IO.pure(a)
final override def unit: IO[Unit] =
IO.unit

final override def map[A, B](fa: IO[A])(f: A => B): IO[B] =
fa.map(f)
final override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] =
ioa.flatMap(f)

final override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] =
ioa.attempt
final override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] =
ioa.handleErrorWith(f)
final override def raiseError[A](e: Throwable): IO[A] =
IO.raiseError(e)

final override def bracket[A, B](acquire: IO[A])
(use: A => IO[B])
(release: A => IO[Unit]): IO[B] =
acquire.bracket(use)(release)

final override def bracketCase[A, B](acquire: IO[A])
(use: A => IO[B])
(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] =
acquire.bracketCase(use)(release)

final override def delay[A](thunk: => A): IO[A] =
IO(thunk)
final override def suspend[A](thunk: => IO[A]): IO[A] =
IO.suspend(thunk)
final override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] =
IO.async(k)
override def liftIO[A](ioa: IO[A]): IO[A] =
ioa
override def toIO[A](fa: IO[A]): IO[A] =
fa
final override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
ioa.runAsync(cb)
final override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] =
ioa.runSyncStep
}
}

private[effect] abstract class IOInstances extends IOLowPriorityInstances {

implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] {
implicit def parApplicative(implicit timer: Timer[IO]): Applicative[IO.Par] = new Applicative[IO.Par] {
import IO.Par.{unwrap, apply => par}

override def pure[A](x: A): IO.Par[A] =
final override def pure[A](x: A): IO.Par[A] =
par(IO.pure(x))
override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] =
par(IOParMap(unwrap(fa), unwrap(fb))(f))
override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] =
final override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] =
par(IOParMap(timer, unwrap(fa), unwrap(fb))(f))
final override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] =
map2(ff, fa)(_(_))
override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] =
final override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] =
map2(fa, fb)((_, _))
override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] =
final override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] =
par(unwrap(fa).map(f))
override def unit: IO.Par[Unit] =
final override def unit: IO.Par[Unit] =
par(IO.unit)
}

implicit val ioConcurrentEffect: ConcurrentEffect[IO] = new ConcurrentEffect[IO] {
override def pure[A](a: A): IO[A] =
IO.pure(a)
override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] =
ioa.flatMap(f)
override def map[A, B](fa: IO[A])(f: A => B): IO[B] =
fa.map(f)
override def delay[A](thunk: => A): IO[A] =
IO(thunk)
override def unit: IO[Unit] =
IO.unit
override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] =
ioa.attempt
override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] =
ioa.handleErrorWith(f)
override def raiseError[A](e: Throwable): IO[A] =
IO.raiseError(e)
override def suspend[A](thunk: => IO[A]): IO[A] =
IO.suspend(thunk)
override def start[A](fa: IO[A]): IO[Fiber[IO, A]] =
implicit def ioConcurrentEffect(implicit timer: Timer[IO]): ConcurrentEffect[IO] = new IOEffect with ConcurrentEffect[IO] {
final override def start[A](fa: IO[A]): IO[Fiber[IO, A]] =
fa.start
override def uncancelable[A](fa: IO[A]): IO[A] =
final override def uncancelable[A](fa: IO[A]): IO[A] =
fa.uncancelable
override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] =
final override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] =
fa.onCancelRaiseError(e)
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] =
IO.async(k)
override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] =

final override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] =
IO.race(fa, fb)
override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
final override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IO.racePair(fa, fb)
override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
ioa.runAsync(cb)
override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] =
ioa.runSyncStep
override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] =

final override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] =
IO.cancelable(k)
override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
final override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
fa.runCancelable(cb)
override def toIO[A](fa: IO[A]): IO[A] =
fa
override def liftIO[A](ioa: IO[A]): IO[A] =
ioa
// this will use stack proportional to the maximum number of joined async suspensions
override def tailRecM[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] =
f(a).flatMap {
case Left(a) => tailRecM(a)(f)
case Right(b) => pure(b)
}
override def bracket[A, B](acquire: IO[A])
(use: A => IO[B])
(release: A => IO[Unit]): IO[B] =
acquire.bracket(use)(release)
override def bracketCase[A, B](acquire: IO[A])
(use: A => IO[B])
(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] =
acquire.bracketCase(use)(release)

final override def toIO[A](fa: IO[A]): IO[A] = fa
final override def liftIO[A](ioa: IO[A]): IO[A] = ioa
}

implicit val ioParallel: Parallel[IO, IO.Par] =
implicit def ioParallel(implicit timer: Timer[IO]): Parallel[IO, IO.Par] =
new Parallel[IO, IO.Par] {
override def applicative: Applicative[IO.Par] =
parApplicative
override def monad: Monad[IO] =
ioConcurrentEffect
override val sequential: ~>[IO.Par, IO] =
final override val applicative: Applicative[IO.Par] =
parApplicative(timer)
final override val monad: Monad[IO] =
ioConcurrentEffect(timer)

final override val sequential: ~>[IO.Par, IO] =
new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) }
override val parallel: ~>[IO, IO.Par] =
final override val parallel: ~>[IO, IO.Par] =
new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = IO.Par(fa) }
}

implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] {
def empty = IO.pure(Monoid[A].empty)
final override def empty: IO[A] = IO.pure(Monoid[A].empty)
}

implicit val ioSemigroupK: SemigroupK[IO] = new SemigroupK[IO] {
def combineK[A](a: IO[A], b: IO[A]): IO[A] =
final override def combineK[A](a: IO[A], b: IO[A]): IO[A] =
ApplicativeError[IO, Throwable].handleErrorWith(a)(_ => b)
}
}
Expand Down Expand Up @@ -1172,8 +1178,8 @@ object IO extends IOInstances {
* Also see [[racePair]] for a version that does not cancel
* the loser automatically on successful results.
*/
def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] =
IORace.simple(lh, rh)
def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] =
IORace.simple(timer, lh, rh)

/**
* Run two IO tasks concurrently, and returns a pair
Expand Down Expand Up @@ -1203,8 +1209,8 @@ object IO extends IOInstances {
* See [[race]] for a simpler version that cancels the loser
* immediately.
*/
def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IORace.pair(lh, rh)
def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IORace.pair(timer, lh, rh)

private[effect] final case class Pure[+A](a: A)
extends IO[A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ private[effect] trait IOBinaryCompat[+A] { self: IO[A] =>
}

private[effect] trait IOCompanionBinaryCompat {
/**
* DEPRECATED — name and type changed in [[IO.ioConcurrentEffect]].
*
* This old variant is kept in order to keep binary compatibility
* until 1.0 — when it will be removed completely.
*/
@deprecated("Renamed to ioConcurrentEffect", "0.10")
private[internals] def ioEffect: Effect[IO] = IO.ioConcurrentEffect

/**
* DEPRECATED — the `ec` parameter is gone.
*
Expand Down
13 changes: 8 additions & 5 deletions core/shared/src/main/scala/cats/effect/internals/IOParMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package cats.effect.internals

import cats.effect.IO
import cats.syntax.apply._
import cats.effect.{IO, Timer}
import cats.effect.internals.Callback.Extensions

import java.util.concurrent.atomic.AtomicReference

import scala.concurrent.ExecutionContext
Expand All @@ -27,7 +29,7 @@ private[effect] object IOParMap {
import Callback.{Type => Callback}

/** Implementation for `parMap2`. */
def apply[A, B, C](fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] =
def apply[A, B, C](timer: Timer[IO], fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] =
IO.Async { (conn, cb) =>
// For preventing stack-overflow errors; using a
// trampolined execution context, so no thread forks
Expand Down Expand Up @@ -56,8 +58,8 @@ private[effect] object IOParMap {
// NOTE: conn.pop() happens when cb gets called!
conn.push(() => Cancelable.cancelAll(connA.cancel, connB.cancel))

IORunLoop.startCancelable(fa, connA, callbackA(connB))
IORunLoop.startCancelable(fb, connB, callbackB(connA))
IORunLoop.startCancelable(timer.shift *> fa, connA, callbackA(connB))
IORunLoop.startCancelable(timer.shift *> fb, connB, callbackB(connA))
}

/** Callback for the left task. */
Expand Down Expand Up @@ -100,7 +102,7 @@ private[effect] object IOParMap {
}

/** Called when an error is generated. */
def sendError(other: IOConnection, e: Throwable): Unit =
def sendError(other: IOConnection, e: Throwable): Unit = {
state.getAndSet(e) match {
case _: Throwable =>
Logger.reportFailure(e)
Expand All @@ -109,6 +111,7 @@ private[effect] object IOParMap {
try other.cancel() finally
cb.async(conn, Left(e))
}
}
})
}
}
Loading

0 comments on commit 8ed6e71

Please sign in to comment.