From 0934ef3e4e7aa88a524cc10c3ca8b8e4790328e6 Mon Sep 17 00:00:00 2001 From: Oleg Pyzhcov Date: Sat, 19 May 2018 15:48:12 +0300 Subject: [PATCH 1/6] Require Timer and shift automatically in concurrent IO operations --- .../cats/effect/internals/IOAppPlatform.scala | 2 +- .../cats/effect/internals/IOAppPlatform.scala | 4 +- .../main/scala/cats/effect/Concurrent.scala | 2 +- .../scala/cats/effect/ConcurrentEffect.scala | 2 +- .../src/main/scala/cats/effect/IO.scala | 123 +++++++++--------- .../effect/internals/IOBinaryCompat.scala | 9 -- .../test/scala/cats/effect/IOAppTests.scala | 15 +-- .../cats/effect/concurrent/MVarTests.scala | 12 +- .../cats/effect/internals/TestUtils.scala | 33 +++++ .../effect/laws/ConcurrentEffectLaws.scala | 3 +- .../cats/effect/laws/ConcurrentLaws.scala | 4 +- .../discipline/ConcurrentEffectTests.scala | 2 +- .../laws/discipline/ConcurrentTests.scala | 2 +- .../src/test/scala/cats/effect/IOTests.scala | 5 +- site/src/main/tut/concurrency/mvar.md | 4 +- site/src/main/tut/concurrency/ref.md | 6 +- site/src/main/tut/concurrency/semaphore.md | 2 +- site/src/main/tut/datatypes/fiber.md | 4 +- site/src/main/tut/datatypes/io.md | 43 ++---- 19 files changed, 143 insertions(+), 134 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/internals/IOAppPlatform.scala b/core/js/src/main/scala/cats/effect/internals/IOAppPlatform.scala index e8bf8a3fb4..ce83d9095c 100644 --- a/core/js/src/main/scala/cats/effect/internals/IOAppPlatform.scala +++ b/core/js/src/main/scala/cats/effect/internals/IOAppPlatform.scala @@ -50,7 +50,7 @@ private[effect] object IOAppPlatform { IO.raiseError(new AssertionError("IOApp keep alive failed unexpectedly.")) case Right(exitCode) => IO.pure(exitCode.code) - }.start + }.start(timer.value) } val defaultTimer: Timer[IO] = IOTimer.global diff --git a/core/jvm/src/main/scala/cats/effect/internals/IOAppPlatform.scala b/core/jvm/src/main/scala/cats/effect/internals/IOAppPlatform.scala index e34bb37c5f..fcaed1a390 100644 --- a/core/jvm/src/main/scala/cats/effect/internals/IOAppPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/internals/IOAppPlatform.scala @@ -30,8 +30,6 @@ private[effect] object IOAppPlatform { } def mainFiber(args: Array[String], timer: Eval[Timer[IO]])(run: List[String] => IO[ExitCode]): IO[Fiber[IO, Int]] = { - val _ = timer // unused in this platform - object Canceled extends RuntimeException for { latch <- IO(new CountDownLatch(1)) @@ -49,7 +47,7 @@ private[effect] object IOAppPlatform { } .productL(IO(latch.countDown())) .map(_.code) - .start + .start(timer.value) _ <- IO(sys.addShutdownHook { fiber.cancel.unsafeRunSync() latch.await() diff --git a/core/shared/src/main/scala/cats/effect/Concurrent.scala b/core/shared/src/main/scala/cats/effect/Concurrent.scala index ac6c766e5a..242fa590ea 100644 --- a/core/shared/src/main/scala/cats/effect/Concurrent.scala +++ b/core/shared/src/main/scala/cats/effect/Concurrent.scala @@ -142,7 +142,7 @@ import scala.util.Either @typeclass @implicitNotFound("""Cannot find implicit value for Concurrent[${F}]. Building this implicit value might depend on having an implicit -s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""") +s.c.ExecutionContext in scope, a Timer, Scheduler or some equivalent type.""") trait Concurrent[F[_]] extends Async[F] { /** * Creates a cancelable `F[A]` instance that executes an diff --git a/core/shared/src/main/scala/cats/effect/ConcurrentEffect.scala b/core/shared/src/main/scala/cats/effect/ConcurrentEffect.scala index 28165fdcd4..15aa953904 100644 --- a/core/shared/src/main/scala/cats/effect/ConcurrentEffect.scala +++ b/core/shared/src/main/scala/cats/effect/ConcurrentEffect.scala @@ -37,7 +37,7 @@ import scala.util.Either @typeclass @implicitNotFound("""Cannot find implicit value for ConcurrentEffect[${F}]. Building this implicit value might depend on having an implicit -s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""") +s.c.ExecutionContext in scope, a Timer, Scheduler or some equivalent type.""") trait ConcurrentEffect[F[_]] extends Concurrent[F] with Effect[F] { /** * Evaluates `F[_]` with the ability to cancel it. diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 9f6a10ede7..e2a1980806 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -18,6 +18,7 @@ package cats package effect import cats.arrow.FunctionK +import cats.syntax.apply._ import cats.effect.internals.Callback.Extensions import cats.effect.internals._ import cats.effect.internals.TrampolineEC.immediate @@ -364,13 +365,9 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * consider in the example above what would happen if the first task * finishes in error. In that case the second task doesn't get canceled, * which creates a potential memory leak. - * - * IMPORTANT — this operation does not start with an asynchronous boundary. - * But you can use [[IO.shift(implicit* IO.shift]] to force an async - * boundary just before start. */ - final def start: IO[Fiber[IO, A @uncheckedVariance]] = - IOStart(this) + final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] = + IOStart(timer.shift *> this) /** * Returns a new `IO` that mirrors the source task for normal termination, @@ -674,17 +671,63 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype } implicit def ioSemigroup[A: Semigroup]: Semigroup[IO[A]] = new IOSemigroup[A] + + implicit val ioEffect: Effect[IO] = new IOEffect + + private[effect] class IOEffect extends Effect[IO] with StackSafeMonad[IO] { + override def pure[A](a: A): IO[A] = + IO.pure(a) + override def unit: IO[Unit] = + IO.unit + + override def map[A, B](fa: IO[A])(f: A => B): IO[B] = + fa.map(f) + override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] = + ioa.flatMap(f) + + override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] = + ioa.attempt + override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] = + ioa.handleErrorWith(f) + override def raiseError[A](e: Throwable): IO[A] = + IO.raiseError(e) + + override def bracket[A, B](acquire: IO[A]) + (use: A => IO[B]) + (release: A => IO[Unit]): IO[B] = + acquire.bracket(use)(release) + + override def bracketCase[A, B](acquire: IO[A]) + (use: A => IO[B]) + (release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] = + acquire.bracketCase(use)(release) + + override def delay[A](thunk: => A): IO[A] = + IO(thunk) + override def suspend[A](thunk: => IO[A]): IO[A] = + IO.suspend(thunk) + override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = + IO.async(k) + override def liftIO[A](ioa: IO[A]): IO[A] = + ioa + override def toIO[A](fa: IO[A]): IO[A] = + fa + override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = + ioa.runAsync(cb) + override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] = + ioa.runSyncStep + } } private[effect] abstract class IOInstances extends IOLowPriorityInstances { - implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] { + implicit def parApplicative(implicit timer: Timer[IO]): Applicative[IO.Par] = new Applicative[IO.Par] { import IO.Par.{unwrap, apply => par} override def pure[A](x: A): IO.Par[A] = par(IO.pure(x)) override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = - par(IOParMap(unwrap(fa), unwrap(fb))(f)) + par(IOParMap(timer.shift *> unwrap(fa), timer.shift *> unwrap(fb))(f)) override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = map2(ff, fa)(_(_)) override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = @@ -695,70 +738,32 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { par(IO.unit) } - implicit val ioConcurrentEffect: ConcurrentEffect[IO] = new ConcurrentEffect[IO] { - override def pure[A](a: A): IO[A] = - IO.pure(a) - override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] = - ioa.flatMap(f) - override def map[A, B](fa: IO[A])(f: A => B): IO[B] = - fa.map(f) - override def delay[A](thunk: => A): IO[A] = - IO(thunk) - override def unit: IO[Unit] = - IO.unit - override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] = - ioa.attempt - override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] = - ioa.handleErrorWith(f) - override def raiseError[A](e: Throwable): IO[A] = - IO.raiseError(e) - override def suspend[A](thunk: => IO[A]): IO[A] = - IO.suspend(thunk) + implicit def ioConcurrentEffect(implicit timer: Timer[IO]): ConcurrentEffect[IO] = new IOEffect with ConcurrentEffect[IO] { override def start[A](fa: IO[A]): IO[Fiber[IO, A]] = fa.start override def uncancelable[A](fa: IO[A]): IO[A] = fa.uncancelable override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] = fa.onCancelRaiseError(e) - override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = - IO.async(k) + override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] = IO.race(fa, fb) override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = IO.racePair(fa, fb) - override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = - ioa.runAsync(cb) - override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] = - ioa.runSyncStep + override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = IO.cancelable(k) override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] = fa.runCancelable(cb) - override def toIO[A](fa: IO[A]): IO[A] = - fa - override def liftIO[A](ioa: IO[A]): IO[A] = - ioa - // this will use stack proportional to the maximum number of joined async suspensions - override def tailRecM[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = - f(a).flatMap { - case Left(a) => tailRecM(a)(f) - case Right(b) => pure(b) - } - override def bracket[A, B](acquire: IO[A]) - (use: A => IO[B]) - (release: A => IO[Unit]): IO[B] = - acquire.bracket(use)(release) - override def bracketCase[A, B](acquire: IO[A]) - (use: A => IO[B]) - (release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] = - acquire.bracketCase(use)(release) + + } - implicit val ioParallel: Parallel[IO, IO.Par] = + implicit def ioParallel(implicit timer: Timer[IO]): Parallel[IO, IO.Par] = new Parallel[IO, IO.Par] { - override def applicative: Applicative[IO.Par] = - parApplicative - override def monad: Monad[IO] = + override val applicative: Applicative[IO.Par] = + parApplicative(timer) + override val monad: Monad[IO] = ioConcurrentEffect override val sequential: ~>[IO.Par, IO] = new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) } @@ -1171,8 +1176,8 @@ object IO extends IOInstances { * Also see [[racePair]] for a version that does not cancel * the loser automatically on successful results. */ - def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] = - IORace.simple(lh, rh) + def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] = + IORace.simple(timer.shift *> lh, timer.shift *> rh) /** * Run two IO tasks concurrently, and returns a pair @@ -1202,8 +1207,8 @@ object IO extends IOInstances { * See [[race]] for a simpler version that cancels the loser * immediately. */ - def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = - IORace.pair(lh, rh) + def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = + IORace.pair(timer.shift *> lh, timer.shift *> rh) private[effect] final case class Pure[+A](a: A) extends IO[A] diff --git a/core/shared/src/main/scala/cats/effect/internals/IOBinaryCompat.scala b/core/shared/src/main/scala/cats/effect/internals/IOBinaryCompat.scala index 2ed7e3d648..5eb518ac70 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOBinaryCompat.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOBinaryCompat.scala @@ -45,15 +45,6 @@ private[effect] trait IOBinaryCompat[+A] { self: IO[A] => } private[effect] trait IOCompanionBinaryCompat { - /** - * DEPRECATED — name and type changed in [[IO.ioConcurrentEffect]]. - * - * This old variant is kept in order to keep binary compatibility - * until 1.0 — when it will be removed completely. - */ - @deprecated("Renamed to ioConcurrentEffect", "0.10") - private[internals] def ioEffect: Effect[IO] = IO.ioConcurrentEffect - /** * DEPRECATED — the `ec` parameter is gone. * diff --git a/core/shared/src/test/scala/cats/effect/IOAppTests.scala b/core/shared/src/test/scala/cats/effect/IOAppTests.scala index c299a7b4d3..7fe5410888 100644 --- a/core/shared/src/test/scala/cats/effect/IOAppTests.scala +++ b/core/shared/src/test/scala/cats/effect/IOAppTests.scala @@ -20,14 +20,13 @@ package effect import cats.effect.internals.{IOAppPlatform, IOPlatform, TestUtils} import cats.implicits._ import org.scalatest.{AsyncFunSuite, BeforeAndAfterAll, Matchers} -import scala.util.Success class IOAppTests extends AsyncFunSuite with Matchers with BeforeAndAfterAll with TestUtils { test("exits with specified code") { IOAppPlatform.mainFiber(Array.empty, Eval.now(implicitly))(_ => IO.pure(ExitCode(42))) .flatMap(_.join) .unsafeToFuture - .value shouldEqual (Some(Success(42))) + .map(_ shouldEqual 42) } test("accepts arguments") { @@ -35,30 +34,28 @@ class IOAppTests extends AsyncFunSuite with Matchers with BeforeAndAfterAll with IO.pure(ExitCode(args.mkString.toInt))) .flatMap(_.join) .unsafeToFuture - .value shouldEqual (Some(Success(123))) + .map(_ shouldEqual 123) } test("raised error exits with 1") { - silenceSystemErr { + silenceSystemErrF { IOAppPlatform.mainFiber(Array.empty, Eval.now(implicitly))(_ => IO.raiseError(new Exception())) .flatMap(_.join) .unsafeToFuture - .value shouldEqual (Some(Success(1))) + .map(_ shouldEqual 1) } } test("canceled IO exits unsuccessfully") { assume(IOPlatform.isJVM, "test relevant only for the JVM") - silenceSystemErr { + silenceSystemErrF { (for { fiber <- IOAppPlatform.mainFiber(Array.empty, Eval.now(implicitly))(_ => IO.never) _ <- fiber.cancel code <- fiber.join } yield code) .unsafeToFuture - .value - .getOrElse(Success(0)) - .getOrElse(0) should be > 0 + .map(_ should be > 0) } } } diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index 07b824db7b..11046d295d 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -140,10 +140,10 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { _ <- av.put(20) r1 <- f1.join r2 <- f2.join - } yield List(r1,r2) + } yield Set(r1,r2) for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20) + r shouldBe Set(10, 20) } } @@ -159,10 +159,10 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { _ <- f1.join _ <- f2.join _ <- f3.join - } yield List(r1, r2, r3) + } yield Set(r1, r2, r3) for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20, 30) + r shouldBe Set(10, 20, 30) } } @@ -178,10 +178,10 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { r1 <- f1.join r2 <- f2.join r3 <- f3.join - } yield List(r1, r2, r3) + } yield Set(r1, r2, r3) for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20, 30) + r shouldBe Set(10, 20, 30) } } diff --git a/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala b/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala index faeee29911..2e5eddf478 100644 --- a/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala +++ b/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala @@ -16,7 +16,11 @@ package cats.effect.internals +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + import java.io.{ByteArrayOutputStream, PrintStream} +import java.util.concurrent.Semaphore /** * INTERNAL API — test utilities. @@ -48,6 +52,35 @@ trait TestUtils { } } + /** + * Silences `System.err`, only printing the output in case exceptions are + * thrown by the executed `thunk` or future it produces is failed. + */ + + private[this] val lock = new Semaphore(1) + def silenceSystemErrF[A](thunk: => Future[A])(implicit ec: ExecutionContext): Future[A] = { + lock.acquire() + // Silencing System.err + val oldErr = System.err + val outStream = new ByteArrayOutputStream() + val fakeErr = new PrintStream(outStream) + System.setErr(fakeErr) + + Future.fromTry(Try(thunk)).flatten.andThen { + case Success(_) => + System.setErr(oldErr) + lock.release() + case Failure(e) => + System.setErr(oldErr) + lock.release() + // In case of errors, print whatever was caught + fakeErr.close() + val out = outStream.toString("utf-8") + if (out.nonEmpty) oldErr.println(out) + throw e + } + } + /** * Catches `System.err` output, for testing purposes. */ diff --git a/laws/shared/src/main/scala/cats/effect/laws/ConcurrentEffectLaws.scala b/laws/shared/src/main/scala/cats/effect/laws/ConcurrentEffectLaws.scala index 7666d6c19a..23d02a0edd 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/ConcurrentEffectLaws.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/ConcurrentEffectLaws.scala @@ -60,7 +60,8 @@ trait ConcurrentEffectLaws[F[_]] extends ConcurrentLaws[F] with EffectLaws[F] { } object ConcurrentEffectLaws { - def apply[F[_]](implicit F0: ConcurrentEffect[F]): ConcurrentEffectLaws[F] = new ConcurrentEffectLaws[F] { + def apply[F[_]](implicit F0: ConcurrentEffect[F], ioTimer0: Timer[IO]): ConcurrentEffectLaws[F] = new ConcurrentEffectLaws[F] { val F = F0 + val ioTimer = ioTimer0 } } diff --git a/laws/shared/src/main/scala/cats/effect/laws/ConcurrentLaws.scala b/laws/shared/src/main/scala/cats/effect/laws/ConcurrentLaws.scala index bbd12a6706..80516a07ab 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/ConcurrentLaws.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/ConcurrentLaws.scala @@ -24,6 +24,7 @@ import scala.Predef.{identity => id} trait ConcurrentLaws[F[_]] extends AsyncLaws[F] { implicit def F: Concurrent[F] + implicit val ioTimer: Timer[IO] def cancelOnBracketReleases[A, B](a: A, f: (A, A) => B) = { val received = for { @@ -243,7 +244,8 @@ trait ConcurrentLaws[F[_]] extends AsyncLaws[F] { } object ConcurrentLaws { - def apply[F[_]](implicit F0: Concurrent[F]): ConcurrentLaws[F] = new ConcurrentLaws[F] { + def apply[F[_]](implicit F0: Concurrent[F], ioTimer0: Timer[IO]): ConcurrentLaws[F] = new ConcurrentLaws[F] { val F = F0 + val ioTimer = ioTimer0 } } diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentEffectTests.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentEffectTests.scala index 5dedc848f1..0452837178 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentEffectTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentEffectTests.scala @@ -70,7 +70,7 @@ trait ConcurrentEffectTests[F[_]] extends ConcurrentTests[F] with EffectTests[F] } object ConcurrentEffectTests { - def apply[F[_]: ConcurrentEffect]: ConcurrentEffectTests[F] = new ConcurrentEffectTests[F] { + def apply[F[_]: ConcurrentEffect](implicit ioTimer: Timer[IO]): ConcurrentEffectTests[F] = new ConcurrentEffectTests[F] { def laws = ConcurrentEffectLaws[F] } } diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentTests.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentTests.scala index b069b9f412..1047b7b852 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/ConcurrentTests.scala @@ -85,7 +85,7 @@ trait ConcurrentTests[F[_]] extends AsyncTests[F] { } object ConcurrentTests { - def apply[F[_]: Concurrent]: ConcurrentTests[F] = new ConcurrentTests[F] { + def apply[F[_]: Concurrent](implicit ioTimer: Timer[IO]): ConcurrentTests[F] = new ConcurrentTests[F] { def laws = ConcurrentLaws[F] } } diff --git a/laws/shared/src/test/scala/cats/effect/IOTests.scala b/laws/shared/src/test/scala/cats/effect/IOTests.scala index 2ef1d9039c..0ffe6e2331 100644 --- a/laws/shared/src/test/scala/cats/effect/IOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOTests.scala @@ -47,7 +47,7 @@ class IOTests extends BaseTestsSuite { ConcurrentEffectTests[IO].concurrentEffect[Int, Int, Int] }) - test("IO.Par's applicative instance is different") { + testAsync("IO.Par's applicative instance is different") { implicit ec => implicitly[Applicative[IO]] shouldNot be(implicitly[Applicative[IO.Par]]) } @@ -532,6 +532,7 @@ class IOTests extends BaseTestsSuite { val io = (0 until count).foldLeft(IO(0))((acc, e) => (acc, IO(e)).parMapN(_ + _)) val f = io.unsafeToFuture() + ec.tick(1.day) f.value shouldEqual Some(Success(count * (count - 1) / 2)) } @@ -733,8 +734,6 @@ object IOTests { fa.runCancelable(cb) def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = IO.cancelable(k) - def start[A](fa: IO[A]): IO[Fiber[IO, A]] = - fa.start def bracketCase[A, B](acquire: IO[A]) (use: A => IO[B]) (release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] = diff --git a/site/src/main/tut/concurrency/mvar.md b/site/src/main/tut/concurrency/mvar.md index 05e84da494..36a27f39c5 100644 --- a/site/src/main/tut/concurrency/mvar.md +++ b/site/src/main/tut/concurrency/mvar.md @@ -149,8 +149,8 @@ def consumer(ch: Channel[Int], sum: Long): IO[Long] = for { channel <- MVar[IO].empty[Option[Int]] count = 100000 - producerTask = IO.shift *> producer(channel, (0 until count).toList) - consumerTask = IO.shift *> consumer(channel, 0L) + producerTask = producer(channel, (0 until count).toList) + consumerTask = consumer(channel, 0L) fp <- producerTask.start fc <- consumerTask.start diff --git a/site/src/main/tut/concurrency/ref.md b/site/src/main/tut/concurrency/ref.md index 31310644ba..4c84a950e7 100644 --- a/site/src/main/tut/concurrency/ref.md +++ b/site/src/main/tut/concurrency/ref.md @@ -72,9 +72,9 @@ val program: IO[Unit] = w2 = new Worker[IO](2, ref) w3 = new Worker[IO](3, ref) _ <- List( - IO.shift *> w1.start, - IO.shift *> w2.start, - IO.shift *> w3.start + w1.start, + w2.start, + w3.start ).parSequence.void } yield () ``` diff --git a/site/src/main/tut/concurrency/semaphore.md b/site/src/main/tut/concurrency/semaphore.md index 8115666ffc..8bc7e7d090 100644 --- a/site/src/main/tut/concurrency/semaphore.md +++ b/site/src/main/tut/concurrency/semaphore.md @@ -83,6 +83,6 @@ val program: IO[Unit] = r1 = new PreciousResource[IO]("R1", s) r2 = new PreciousResource[IO]("R2", s) r3 = new PreciousResource[IO]("R3", s) - _ <- List(IO.shift *> r1.use, IO.shift *> r2.use, IO.shift *> r3.use).parSequence.void + _ <- List(r1.use, r2.use, r3.use).parSequence.void } yield () ``` diff --git a/site/src/main/tut/datatypes/fiber.md b/site/src/main/tut/datatypes/fiber.md index 078029900b..360d629f61 100644 --- a/site/src/main/tut/datatypes/fiber.md +++ b/site/src/main/tut/datatypes/fiber.md @@ -25,7 +25,7 @@ import cats.implicits._ import scala.concurrent.ExecutionContext.Implicits.global -val io = IO.shift *> IO(println("Hello!")) +val io = IO(println("Hello!")) val fiber: IO[Fiber[IO, Unit]] = io.start ``` @@ -36,7 +36,7 @@ val launchMissiles = IO.raiseError(new Exception("boom!")) val runToBunker = IO(println("To the bunker!!!")) for { - fiber <- IO.shift *> launchMissiles.start + fiber <- launchMissiles.start _ <- runToBunker.handleErrorWith { error => // Retreat failed, cancel launch (maybe we should // have retreated to our bunker before the launch?) diff --git a/site/src/main/tut/datatypes/io.md b/site/src/main/tut/datatypes/io.md index ccbb85dabc..0216d3fdfb 100644 --- a/site/src/main/tut/datatypes/io.md +++ b/site/src/main/tut/datatypes/io.md @@ -580,7 +580,7 @@ val launchMissiles = IO.raiseError(new Exception("boom!")) val runToBunker = IO(println("To the bunker!!!")) for { - fiber <- IO.shift *> launchMissiles.start + fiber <- launchMissiles.start _ <- runToBunker.handleErrorWith { error => // Retreat failed, cancel launch (maybe we should // have retreated to our bunker before the launch?) @@ -594,10 +594,6 @@ for { Implementation notes: -- `start` does NOT fork automatically, only asynchronous `IO` values - will actually execute concurrently via `start`, so in order to - ensure parallel execution for synchronous actions, then you can use - `IO.shift` (remember, with `IO` such behavior is always explicit!) - the `*>` operator is defined in Cats and you can treat it as an alias for `lh.flatMap(_ => rh)` @@ -1211,14 +1207,14 @@ Since the introduction of the [Parallel](https://github.com/typelevel/cats/blob/ ### parMapN -It has the potential to run an arbitrary number of `IO`s in parallel, as long as the `IO` values have asynchronous execution, and it allows you to apply a function to the result (as in `map`). It finishes processing when all the `IO`s are completed, either successfully or with a failure. For example: +It has the potential to run an arbitrary number of `IO`s in parallel, and it allows you to apply a function to the result (as in `map`). It finishes processing when all the `IO`s are completed, either successfully or with a failure. For example: ```tut:book import cats.syntax.all._ -val ioA = IO.shift *> IO(println("Running ioA")) -val ioB = IO.shift *> IO(println("Running ioB")) -val ioC = IO.shift *> IO(println("Running ioC")) +val ioA = IO(println("Running ioA")) +val ioB = IO(println("Running ioB")) +val ioC = IO(println("Running ioC")) val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () } @@ -1228,8 +1224,8 @@ program.unsafeRunSync() If any of the `IO`s completes with a failure then the result of the whole computation will be failed but not until all the `IO`s are completed. Example: ```tut:nofail -val a = IO.shift *> (IO.raiseError[Unit](new Exception("boom")) <* IO(println("Running ioA"))) -val b = IO.shift *> IO(println("Running ioB")) +val a = IO.raiseError[Unit](new Exception("boom")) <* IO(println("Running ioA")) +val b = IO(println("Running ioB")) val parFailure = (a, b).parMapN { (_, _) => () } @@ -1240,35 +1236,22 @@ If one of the tasks fails immediately, then the other gets canceled and the comp ```tut:silent val ioA = Timer[IO].sleep(10.seconds) *> IO(println("Delayed!")) -val ioB = IO.shift *> IO.raiseError[Unit](new Exception("dummy")) +val ioB = IO.raiseError[Unit](new Exception("dummy")) (ioA, ioB).parMapN((_, _) => ()) ``` -Note that the following example **will not run in parallel** because it's missing the asynchronous execution: - -```tut:book -val c = IO(println("Hey C!")) -val d = IO(println("Hey D!")) - -val nonParallel = (c, d).parMapN { (_, _) => () } - -nonParallel.unsafeRunSync() -``` - -With `IO` thread forking or call-stack shifting has to be explicit. This goes for `parMapN` and for `start` as well. If scheduling fairness is a concern, then asynchronous boundaries have to be explicit. - ### parSequence -If you have a list of IO, and you want a single IO with the result list you can use `parSequence` which executes the IO tasks in parallel. The IO tasks must be asynchronous, which if they are not you can use [shift](#shift). +If you have a list of IO, and you want a single IO with the result list you can use `parSequence` which executes the IO tasks in parallel. ```tut:book import cats._, cats.data._, cats.syntax.all._, cats.effect.IO -val asyncIO = IO.shift *> IO(1) +val anIO = IO(1) val aLotOfIOs = - NonEmptyList.of(asyncIO, asyncIO) + NonEmptyList.of(anIO, anIO) val ioOfList = aLotOfIOs.parSequence ``` @@ -1277,11 +1260,11 @@ There is also `cats.Traverse.sequence` which does this synchronously. ### parTraverse -If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parTraverse` to run the steps in parallel. The IO tasks must be asynchronous, which if they are not you can use shift. +If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parSequence` to run the steps in parallel. ```tut:book val results = NonEmptyList.of(1, 2, 3).parTraverse { i => - IO.shift *> IO(i) + IO(i) } ``` From f07fcee525769caf59dadd461ffc6ad7ea23663b Mon Sep 17 00:00:00 2001 From: Oleg Pyzhcov Date: Sun, 20 May 2018 11:41:12 +0300 Subject: [PATCH 2/6] Undo Future silencing, use immediate EC in IOAppTests --- .../test/scala/cats/effect/IOAppTests.scala | 10 ++++-- .../cats/effect/internals/TestUtils.scala | 33 ------------------- 2 files changed, 7 insertions(+), 36 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/IOAppTests.scala b/core/shared/src/test/scala/cats/effect/IOAppTests.scala index 7fe5410888..20f32ce7aa 100644 --- a/core/shared/src/test/scala/cats/effect/IOAppTests.scala +++ b/core/shared/src/test/scala/cats/effect/IOAppTests.scala @@ -17,7 +17,9 @@ package cats package effect -import cats.effect.internals.{IOAppPlatform, IOPlatform, TestUtils} +import scala.concurrent.ExecutionContext + +import cats.effect.internals.{IOAppPlatform, IOPlatform, TestUtils, TrampolineEC} import cats.implicits._ import org.scalatest.{AsyncFunSuite, BeforeAndAfterAll, Matchers} @@ -38,7 +40,7 @@ class IOAppTests extends AsyncFunSuite with Matchers with BeforeAndAfterAll with } test("raised error exits with 1") { - silenceSystemErrF { + silenceSystemErr { IOAppPlatform.mainFiber(Array.empty, Eval.now(implicitly))(_ => IO.raiseError(new Exception())) .flatMap(_.join) .unsafeToFuture @@ -48,7 +50,7 @@ class IOAppTests extends AsyncFunSuite with Matchers with BeforeAndAfterAll with test("canceled IO exits unsuccessfully") { assume(IOPlatform.isJVM, "test relevant only for the JVM") - silenceSystemErrF { + silenceSystemErr { (for { fiber <- IOAppPlatform.mainFiber(Array.empty, Eval.now(implicitly))(_ => IO.never) _ <- fiber.cancel @@ -58,4 +60,6 @@ class IOAppTests extends AsyncFunSuite with Matchers with BeforeAndAfterAll with .map(_ should be > 0) } } + + override implicit def executionContext: ExecutionContext = TrampolineEC.immediate } diff --git a/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala b/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala index 2e5eddf478..faeee29911 100644 --- a/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala +++ b/core/shared/src/test/scala/cats/effect/internals/TestUtils.scala @@ -16,11 +16,7 @@ package cats.effect.internals -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} - import java.io.{ByteArrayOutputStream, PrintStream} -import java.util.concurrent.Semaphore /** * INTERNAL API — test utilities. @@ -52,35 +48,6 @@ trait TestUtils { } } - /** - * Silences `System.err`, only printing the output in case exceptions are - * thrown by the executed `thunk` or future it produces is failed. - */ - - private[this] val lock = new Semaphore(1) - def silenceSystemErrF[A](thunk: => Future[A])(implicit ec: ExecutionContext): Future[A] = { - lock.acquire() - // Silencing System.err - val oldErr = System.err - val outStream = new ByteArrayOutputStream() - val fakeErr = new PrintStream(outStream) - System.setErr(fakeErr) - - Future.fromTry(Try(thunk)).flatten.andThen { - case Success(_) => - System.setErr(oldErr) - lock.release() - case Failure(e) => - System.setErr(oldErr) - lock.release() - // In case of errors, print whatever was caught - fakeErr.close() - val out = outStream.toString("utf-8") - if (out.nonEmpty) oldErr.println(out) - throw e - } - } - /** * Catches `System.err` output, for testing purposes. */ From 1729f6b528246f26bfa0a7d4334be9d0a943416c Mon Sep 17 00:00:00 2001 From: Oleg Pyzhcov Date: Sun, 20 May 2018 11:48:14 +0300 Subject: [PATCH 3/6] Make methods in TC instances final --- .../src/main/scala/cats/effect/IO.scala | 71 +++++++++---------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index e2a1980806..01b73c23d2 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -675,46 +675,46 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype implicit val ioEffect: Effect[IO] = new IOEffect private[effect] class IOEffect extends Effect[IO] with StackSafeMonad[IO] { - override def pure[A](a: A): IO[A] = + final override def pure[A](a: A): IO[A] = IO.pure(a) - override def unit: IO[Unit] = + final override def unit: IO[Unit] = IO.unit - override def map[A, B](fa: IO[A])(f: A => B): IO[B] = + final override def map[A, B](fa: IO[A])(f: A => B): IO[B] = fa.map(f) - override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] = + final override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] = ioa.flatMap(f) - override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] = + final override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] = ioa.attempt - override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] = + final override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] = ioa.handleErrorWith(f) - override def raiseError[A](e: Throwable): IO[A] = + final override def raiseError[A](e: Throwable): IO[A] = IO.raiseError(e) - override def bracket[A, B](acquire: IO[A]) + final override def bracket[A, B](acquire: IO[A]) (use: A => IO[B]) (release: A => IO[Unit]): IO[B] = acquire.bracket(use)(release) - override def bracketCase[A, B](acquire: IO[A]) + final override def bracketCase[A, B](acquire: IO[A]) (use: A => IO[B]) (release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] = acquire.bracketCase(use)(release) - override def delay[A](thunk: => A): IO[A] = + final override def delay[A](thunk: => A): IO[A] = IO(thunk) - override def suspend[A](thunk: => IO[A]): IO[A] = + final override def suspend[A](thunk: => IO[A]): IO[A] = IO.suspend(thunk) - override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = + final override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = IO.async(k) - override def liftIO[A](ioa: IO[A]): IO[A] = + final override def liftIO[A](ioa: IO[A]): IO[A] = ioa - override def toIO[A](fa: IO[A]): IO[A] = + final override def toIO[A](fa: IO[A]): IO[A] = fa - override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = + final override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = ioa.runAsync(cb) - override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] = + final override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] = ioa.runSyncStep } } @@ -724,59 +724,58 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { implicit def parApplicative(implicit timer: Timer[IO]): Applicative[IO.Par] = new Applicative[IO.Par] { import IO.Par.{unwrap, apply => par} - override def pure[A](x: A): IO.Par[A] = + final override def pure[A](x: A): IO.Par[A] = par(IO.pure(x)) - override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = + final override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = par(IOParMap(timer.shift *> unwrap(fa), timer.shift *> unwrap(fb))(f)) - override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = + final override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = map2(ff, fa)(_(_)) - override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = + final override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = map2(fa, fb)((_, _)) - override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] = + final override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] = par(unwrap(fa).map(f)) - override def unit: IO.Par[Unit] = + final override def unit: IO.Par[Unit] = par(IO.unit) } implicit def ioConcurrentEffect(implicit timer: Timer[IO]): ConcurrentEffect[IO] = new IOEffect with ConcurrentEffect[IO] { - override def start[A](fa: IO[A]): IO[Fiber[IO, A]] = + final override def start[A](fa: IO[A]): IO[Fiber[IO, A]] = fa.start - override def uncancelable[A](fa: IO[A]): IO[A] = + final override def uncancelable[A](fa: IO[A]): IO[A] = fa.uncancelable - override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] = + final override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] = fa.onCancelRaiseError(e) - override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] = + final override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] = IO.race(fa, fb) - override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = + final override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = IO.racePair(fa, fb) - override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = + final override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = IO.cancelable(k) - override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] = + final override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] = fa.runCancelable(cb) - } implicit def ioParallel(implicit timer: Timer[IO]): Parallel[IO, IO.Par] = new Parallel[IO, IO.Par] { - override val applicative: Applicative[IO.Par] = + final override val applicative: Applicative[IO.Par] = parApplicative(timer) - override val monad: Monad[IO] = + final override val monad: Monad[IO] = ioConcurrentEffect - override val sequential: ~>[IO.Par, IO] = + final override val sequential: ~>[IO.Par, IO] = new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) } - override val parallel: ~>[IO, IO.Par] = + final override val parallel: ~>[IO, IO.Par] = new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = IO.Par(fa) } } implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] { - def empty = IO.pure(Monoid[A].empty) + final override def empty: IO[A] = IO.pure(Monoid[A].empty) } implicit val ioSemigroupK: SemigroupK[IO] = new SemigroupK[IO] { - def combineK[A](a: IO[A], b: IO[A]): IO[A] = + final override def combineK[A](a: IO[A], b: IO[A]): IO[A] = ApplicativeError[IO, Throwable].handleErrorWith(a)(_ => b) } } From 5107d1986b6c5c81839f280ed96342ae36d54227 Mon Sep 17 00:00:00 2001 From: Oleg Pyzhcov Date: Sun, 20 May 2018 13:03:47 +0300 Subject: [PATCH 4/6] Move "shift *>" down the call stack; fix nondeterministic test failure --- .../src/main/scala/cats/effect/IO.scala | 15 +++++++----- .../cats/effect/internals/IOParMap.scala | 13 ++++++---- .../scala/cats/effect/internals/IORace.scala | 13 +++++----- .../src/test/scala/cats/effect/IOTests.scala | 24 ++++--------------- 4 files changed, 28 insertions(+), 37 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 01b73c23d2..858ecf6285 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -708,9 +708,9 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype IO.suspend(thunk) final override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = IO.async(k) - final override def liftIO[A](ioa: IO[A]): IO[A] = + override def liftIO[A](ioa: IO[A]): IO[A] = ioa - final override def toIO[A](fa: IO[A]): IO[A] = + override def toIO[A](fa: IO[A]): IO[A] = fa final override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = ioa.runAsync(cb) @@ -727,7 +727,7 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { final override def pure[A](x: A): IO.Par[A] = par(IO.pure(x)) final override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = - par(IOParMap(timer.shift *> unwrap(fa), timer.shift *> unwrap(fb))(f)) + par(IOParMap(timer, unwrap(fa), unwrap(fb))(f)) final override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = map2(ff, fa)(_(_)) final override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = @@ -756,6 +756,8 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { final override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] = fa.runCancelable(cb) + final override def toIO[A](fa: IO[A]): IO[A] = fa + final override def liftIO[A](ioa: IO[A]): IO[A] = ioa } implicit def ioParallel(implicit timer: Timer[IO]): Parallel[IO, IO.Par] = @@ -763,7 +765,8 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { final override val applicative: Applicative[IO.Par] = parApplicative(timer) final override val monad: Monad[IO] = - ioConcurrentEffect + ioConcurrentEffect(timer) + final override val sequential: ~>[IO.Par, IO] = new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) } final override val parallel: ~>[IO, IO.Par] = @@ -1176,7 +1179,7 @@ object IO extends IOInstances { * the loser automatically on successful results. */ def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] = - IORace.simple(timer.shift *> lh, timer.shift *> rh) + IORace.simple(timer, lh, rh) /** * Run two IO tasks concurrently, and returns a pair @@ -1207,7 +1210,7 @@ object IO extends IOInstances { * immediately. */ def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = - IORace.pair(timer.shift *> lh, timer.shift *> rh) + IORace.pair(timer, lh, rh) private[effect] final case class Pure[+A](a: A) extends IO[A] diff --git a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala index 3038a088f9..1daaf12f6c 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -16,8 +16,10 @@ package cats.effect.internals -import cats.effect.IO +import cats.syntax.apply._ +import cats.effect.{IO, Timer} import cats.effect.internals.Callback.Extensions + import java.util.concurrent.atomic.AtomicReference import scala.concurrent.ExecutionContext @@ -25,7 +27,7 @@ private[effect] object IOParMap { import Callback.{Type => Callback} /** Implementation for `parMap2`. */ - def apply[A, B, C](fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = + def apply[A, B, C](timer: Timer[IO], fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = IO.Async { (conn, cb) => // For preventing stack-overflow errors; using a // trampolined execution context, so no thread forks @@ -54,8 +56,8 @@ private[effect] object IOParMap { // NOTE: conn.pop() happens when cb gets called! conn.push(() => Cancelable.cancelAll(connA.cancel, connB.cancel)) - IORunLoop.startCancelable(fa, connA, callbackA(connB)) - IORunLoop.startCancelable(fb, connB, callbackB(connA)) + IORunLoop.startCancelable(timer.shift *> fa, connA, callbackA(connB)) + IORunLoop.startCancelable(timer.shift *> fb, connB, callbackB(connA)) } /** Callback for the left task. */ @@ -98,7 +100,7 @@ private[effect] object IOParMap { } /** Called when an error is generated. */ - def sendError(other: IOConnection, e: Throwable): Unit = + def sendError(other: IOConnection, e: Throwable): Unit = { state.getAndSet(e) match { case _: Throwable => Logger.reportFailure(e) @@ -107,6 +109,7 @@ private[effect] object IOParMap { try other.cancel() finally cb.async(conn, Left(e)) } + } }) } } diff --git a/core/shared/src/main/scala/cats/effect/internals/IORace.scala b/core/shared/src/main/scala/cats/effect/internals/IORace.scala index 3de8a8b028..970c07ad01 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IORace.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IORace.scala @@ -19,6 +19,7 @@ package internals import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.Promise +import cats.syntax.apply._ import cats.effect.internals.Callback.{Type => Callback} import cats.effect.internals.Callback.Extensions @@ -28,7 +29,7 @@ private[effect] object IORace { * but this way it is more efficient, as we no longer have to keep * internal promises. */ - def simple[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] = { + def simple[A, B](timer: Timer[IO], lh: IO[A], rh: IO[B]): IO[Either[A, B]] = { // Signals successful results def onSuccess[T, U]( isActive: AtomicBoolean, @@ -65,7 +66,7 @@ private[effect] object IORace { val connR = IOConnection() // Starts concurrent execution for the left value - IORunLoop.startCancelable[A](lh, connL, { + IORunLoop.startCancelable[A](timer.shift *> lh, connL, { case Right(a) => onSuccess(active, connR, cb, Left(a)) case Left(err) => @@ -73,7 +74,7 @@ private[effect] object IORace { }) // Starts concurrent execution for the right value - IORunLoop.startCancelable[B](rh, connR, { + IORunLoop.startCancelable[B](timer.shift *> rh, connR, { case Right(b) => onSuccess(active, connL, cb, Right(b)) case Left(err) => @@ -88,7 +89,7 @@ private[effect] object IORace { /** * Implementation for `IO.racePair` */ - def pair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = { + def pair[A, B](timer: Timer[IO], lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = { IO.cancelable { cb => val active = new AtomicBoolean(true) // Cancelable connection for the left value @@ -99,7 +100,7 @@ private[effect] object IORace { val promiseR = Promise[Either[Throwable, B]]() // Starts concurrent execution for the left value - IORunLoop.startCancelable[A](lh, connL, { + IORunLoop.startCancelable[A](timer.shift *> lh, connL, { case Right(a) => if (active.getAndSet(false)) cb.async(Right(Left((a, IOFiber.build[B](promiseR, connR))))) @@ -116,7 +117,7 @@ private[effect] object IORace { }) // Starts concurrent execution for the right value - IORunLoop.startCancelable[B](rh, connR, { + IORunLoop.startCancelable[B](timer.shift *> rh, connR, { case Right(b) => if (active.getAndSet(false)) cb.async(Right(Right((IOFiber.build[A](promiseL, connL), b)))) diff --git a/laws/shared/src/test/scala/cats/effect/IOTests.scala b/laws/shared/src/test/scala/cats/effect/IOTests.scala index 0ffe6e2331..4a01285c75 100644 --- a/laws/shared/src/test/scala/cats/effect/IOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOTests.scala @@ -493,38 +493,22 @@ class IOTests extends BaseTestsSuite { f2.value shouldEqual Some(Failure(dummy)) } - testAsync("parMap2 can fail for both, with left failing first") { implicit ec => + testAsync("parMap2 can fail for both, with non-deterministic failure") { implicit ec => val error = catchSystemErr { val dummy1 = new RuntimeException("dummy1") val dummy2 = new RuntimeException("dummy2") val io1 = IO.raiseError[Int](dummy1) - val io2 = IO.shift *> IO.raiseError[Int](dummy2) - val io3 = (io1, io2).parMapN(_ + _) - - val f1 = io3.unsafeToFuture() - ec.tick() - f1.value shouldBe Some(Failure(dummy1)) - } - - error should include("dummy2") - } - - testAsync("parMap2 can fail for both, with right failing first") { implicit ec => - val error = catchSystemErr { - val dummy1 = new RuntimeException("dummy1") - val dummy2 = new RuntimeException("dummy2") - - val io1 = IO.shift *> IO.raiseError[Int](dummy1) val io2 = IO.raiseError[Int](dummy2) val io3 = (io1, io2).parMapN(_ + _) val f1 = io3.unsafeToFuture() ec.tick() - f1.value shouldBe Some(Failure(dummy2)) + val exc = f1.value.get.failed.get + exc should (be (dummy1) or be (dummy2)) } - error should include("dummy1") + error should include("dummy") } testAsync("parMap2 is stack safe") { implicit ec => From a3b9e3b697a2b7d17046d6355bdd660ba161dddc Mon Sep 17 00:00:00 2001 From: Oleg Pyzhcov Date: Tue, 22 May 2018 16:23:31 +0300 Subject: [PATCH 5/6] Final batch of fixes - Reverted misfix in documentation. Added note on creating Timer[IO]s. - Fixed failing MVar tests - Moved timer.shift from IO.start deeper to IOStart --- .../src/main/scala/cats/effect/IO.scala | 3 +-- .../scala/cats/effect/internals/IOStart.scala | 7 ++--- .../cats/effect/concurrent/MVarTests.scala | 11 +++++--- site/src/main/tut/datatypes/io.md | 27 +++++++++---------- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 858ecf6285..2841a5443f 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -18,7 +18,6 @@ package cats package effect import cats.arrow.FunctionK -import cats.syntax.apply._ import cats.effect.internals.Callback.Extensions import cats.effect.internals._ import cats.effect.internals.TrampolineEC.immediate @@ -367,7 +366,7 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * which creates a potential memory leak. */ final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] = - IOStart(timer.shift *> this) + IOStart(timer, this) /** * Returns a new `IO` that mirrors the source task for normal termination, diff --git a/core/shared/src/main/scala/cats/effect/internals/IOStart.scala b/core/shared/src/main/scala/cats/effect/internals/IOStart.scala index 5e02d7c73e..efd286cb6d 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOStart.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOStart.scala @@ -16,7 +16,8 @@ package cats.effect.internals -import cats.effect.{Fiber, IO} +import cats.syntax.apply._ +import cats.effect.{Fiber, IO, Timer} import cats.effect.internals.TrampolineEC.immediate import scala.concurrent.{ExecutionContext, Promise} @@ -24,7 +25,7 @@ private[effect] object IOStart { /** * Implementation for `IO.start`. */ - def apply[A](fa: IO[A]): IO[Fiber[IO, A]] = + def apply[A](timer: Timer[IO], fa: IO[A]): IO[Fiber[IO, A]] = IO.Async { (_, cb) => implicit val ec: ExecutionContext = immediate // Light async boundary @@ -36,7 +37,7 @@ private[effect] object IOStart { // Starting the source `IO`, with a new connection, because its // cancellation is now decoupled from our current one val conn2 = IOConnection() - IORunLoop.startCancelable(fa, conn2, p.success) + IORunLoop.startCancelable(timer.shift *> fa, conn2, p.success) // Building a memoized IO - note we cannot use `IO.fromFuture` // because we need to link this `IO`'s cancellation with that diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index 11046d295d..2776967a0c 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -36,14 +36,15 @@ class MVarConcurrentTests extends BaseMVarTests { _ <- mVar.put(1).start p2 <- mVar.put(2).start _ <- mVar.put(3).start + _ <- IO.sleep(10.millis) // Give put callbacks a chance to register _ <- p2.cancel _ <- mVar.take r1 <- mVar.take r3 <- mVar.take - } yield List(r1, r3) + } yield Set(r1, r3) for (r <- task.unsafeToFuture()) yield { - r shouldBe List(1, 3) + r shouldBe Set(1, 3) } } @@ -53,15 +54,16 @@ class MVarConcurrentTests extends BaseMVarTests { t1 <- mVar.take.start t2 <- mVar.take.start t3 <- mVar.take.start + _ <- IO.sleep(10.millis) // Give take callbacks a chance to register _ <- t2.cancel _ <- mVar.put(1) _ <- mVar.put(3) r1 <- t1.join r3 <- t3.join - } yield List(r1, r3) + } yield Set(r1, r3) for (r <- task.unsafeToFuture()) yield { - r shouldBe List(1, 3) + r shouldBe Set(1, 3) } } @@ -70,6 +72,7 @@ class MVarConcurrentTests extends BaseMVarTests { mVar <- MVar[IO].empty[Int] finished <- Deferred.uncancelable[IO, Int] fiber <- mVar.read.flatMap(finished.complete).start + _ <- IO.sleep(10.millis) // Give read callback a chance to register _ <- fiber.cancel _ <- mVar.put(10) fallback = IO.sleep(100.millis) *> IO.pure(0) diff --git a/site/src/main/tut/datatypes/io.md b/site/src/main/tut/datatypes/io.md index 0216d3fdfb..36d207c87d 100644 --- a/site/src/main/tut/datatypes/io.md +++ b/site/src/main/tut/datatypes/io.md @@ -573,7 +573,7 @@ thread that can be either joined (via `join`) or interrupted (via Example: ```tut:silent -// Needed in order to get a Timer[IO], for IO.shift below +// Needed in order to get a Timer[IO], for IO.start to execute concurrently import scala.concurrent.ExecutionContext.Implicits.global val launchMissiles = IO.raiseError(new Exception("boom!")) @@ -706,9 +706,8 @@ def fib(n: Int, a: Long, b: Long): IO[Long] = } ``` -Again mentioning this for posterity: with `IO` everything is explicit, -the protocol being easy to follow and predictable in a WYSIWYG -fashion. +With `IO`, fairness needs to be managed explicitly, the protocol being +easy to follow and predictable in a WYSIWYG fashion. ### Race Conditions — race & racePair @@ -720,10 +719,10 @@ losers being usually canceled. ```scala // simple version -def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] +def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] // advanced version -def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] +def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] ``` The simple version, `IO.race`, will cancel the loser immediately, @@ -733,7 +732,7 @@ you decide what to do next. So `race` can be derived with `racePair` like so: ```tut:silent -def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] = +def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] = IO.racePair(lh, rh).flatMap { case Left((a, fiber)) => fiber.cancel.map(_ => Left(a)) @@ -762,13 +761,7 @@ def timeout[A](fa: IO[A], after: FiniteDuration) } ``` -NOTE: like all things with `IO`, tasks are not forked automatically -and `race` won't do automatic forking either. So if the given tasks -are asynchronous, they could be executed in parallel, but if they -are not asynchronous (and thus have immediate execution), then -they won't run in parallel. - -Always remember that IO's policy is WYSIWYG. +See *Parallelism* section above for how to obtain a `Timer[IO]` ### Comparison with Haskell's "async interruption" @@ -1205,6 +1198,10 @@ def loop(n: Int): IO[Int] = Since the introduction of the [Parallel](https://github.com/typelevel/cats/blob/master/core/src/main/scala/cats/Parallel.scala) typeclasss in the Cats library and its `IO` instance, it became possible to execute two or more given `IO`s in parallel. +Note: all parallel operations require an implicit `Timer[IO]` in scope. +On JVM, `Timer[IO]` is available when there's an implicit `ExecutionContext` in scope. Alternatively, it can be created using `IO.timer` builder, taking an `ExecutionContext`, managing actual execution, and a `ScheduledExecutorService`, which manages scheduling (not actual execution, so no point in giving it more than one thread). +On JS, `Timer[IO]` is always available. + ### parMapN It has the potential to run an arbitrary number of `IO`s in parallel, and it allows you to apply a function to the result (as in `map`). It finishes processing when all the `IO`s are completed, either successfully or with a failure. For example: @@ -1260,7 +1257,7 @@ There is also `cats.Traverse.sequence` which does this synchronously. ### parTraverse -If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parSequence` to run the steps in parallel. +If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parTraverse` to run the steps in parallel. ```tut:book val results = NonEmptyList.of(1, 2, 3).parTraverse { i => From 010d6d7b9aa78233caabf17f7d14a68a4ea251f7 Mon Sep 17 00:00:00 2001 From: Oleg Pyzhcov Date: Tue, 22 May 2018 16:38:45 +0300 Subject: [PATCH 6/6] Silence MiMa checks --- build.sbt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/build.sbt b/build.sbt index 6cabd0ca3d..0c3149b743 100644 --- a/build.sbt +++ b/build.sbt @@ -184,6 +184,22 @@ val mimaSettings = Seq( exclude[ReversedMissingMethodProblem]("cats.effect.Effect#EitherTEffect.toIO"), exclude[ReversedMissingMethodProblem]("cats.effect.Effect.toIO"), exclude[ReversedMissingMethodProblem]("cats.effect.ConcurrentEffect.toIO"), + + + // Require Timer[IO] for auto-shifting now + exclude[DirectMissingMethodProblem]("cats.effect.IO.start"), + exclude[DirectMissingMethodProblem]("cats.effect.IO.race"), + exclude[DirectMissingMethodProblem]("cats.effect.IO.racePair"), + exclude[DirectMissingMethodProblem]("cats.effect.IOParallelNewtype.ioEffect"), + exclude[DirectMissingMethodProblem]("cats.effect.IOInstances.parApplicative"), + exclude[DirectMissingMethodProblem]("cats.effect.IOInstances.ioParallel"), + exclude[DirectMissingMethodProblem]("cats.effect.IOInstances.ioConcurrentEffect"), + exclude[DirectMissingMethodProblem]("cats.effect.internals.IOParMap.apply"), + exclude[DirectMissingMethodProblem]("cats.effect.internals.IOCompanionBinaryCompat.ioEffect"), + exclude[DirectMissingMethodProblem]("cats.effect.internals.IORace.simple"), + exclude[DirectMissingMethodProblem]("cats.effect.internals.IORace.pair"), + exclude[DirectMissingMethodProblem]("cats.effect.internals.IOStart.apply"), + // // Following are all internal implementation details: //