Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require Timer and shift automatically in concurrent IO operations #232

Merged
merged 6 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[effect] object IOAppPlatform {
IO.raiseError(new AssertionError("IOApp keep alive failed unexpectedly."))
case Right(exitCode) =>
IO.pure(exitCode.code)
}.start
}.start(timer.value)
}

val defaultTimer: Timer[IO] = IOTimer.global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ private[effect] object IOAppPlatform {
}

def mainFiber(args: Array[String], timer: Eval[Timer[IO]])(run: List[String] => IO[ExitCode]): IO[Fiber[IO, Int]] = {
val _ = timer // unused in this platform

object Canceled extends RuntimeException
for {
latch <- IO(new CountDownLatch(1))
Expand All @@ -49,7 +47,7 @@ private[effect] object IOAppPlatform {
}
.productL(IO(latch.countDown()))
.map(_.code)
.start
.start(timer.value)
_ <- IO(sys.addShutdownHook {
fiber.cancel.unsafeRunSync()
latch.await()
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/Concurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ import scala.util.Either
@typeclass
@implicitNotFound("""Cannot find implicit value for Concurrent[${F}].
Building this implicit value might depend on having an implicit
s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""")
s.c.ExecutionContext in scope, a Timer, Scheduler or some equivalent type.""")
trait Concurrent[F[_]] extends Async[F] {
/**
* Creates a cancelable `F[A]` instance that executes an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.util.Either
@typeclass
@implicitNotFound("""Cannot find implicit value for ConcurrentEffect[${F}].
Building this implicit value might depend on having an implicit
s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""")
s.c.ExecutionContext in scope, a Timer, Scheduler or some equivalent type.""")
trait ConcurrentEffect[F[_]] extends Concurrent[F] with Effect[F] {
/**
* Evaluates `F[_]` with the ability to cancel it.
Expand Down
161 changes: 84 additions & 77 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cats
package effect

import cats.arrow.FunctionK
import cats.syntax.apply._
import cats.effect.internals.Callback.Extensions
import cats.effect.internals._
import cats.effect.internals.TrampolineEC.immediate
Expand Down Expand Up @@ -364,13 +365,9 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] {
* consider in the example above what would happen if the first task
* finishes in error. In that case the second task doesn't get canceled,
* which creates a potential memory leak.
*
* IMPORTANT — this operation does not start with an asynchronous boundary.
* But you can use [[IO.shift(implicit* IO.shift]] to force an async
* boundary just before start.
*/
final def start: IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(this)
final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(timer.shift *> this)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just make IOStart(timer, this) so if we never actually evaluatue the result we save some allocations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 agree with @johnynek


/**
* Returns a new `IO` that mirrors the source task for normal termination,
Expand Down Expand Up @@ -674,104 +671,114 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype
}

implicit def ioSemigroup[A: Semigroup]: Semigroup[IO[A]] = new IOSemigroup[A]

implicit val ioEffect: Effect[IO] = new IOEffect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing about this is that we get to keep the old ioEffect, with the previous type.


private[effect] class IOEffect extends Effect[IO] with StackSafeMonad[IO] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WTF I did not know there's a StackSafeMonad in Cats since Jun 20, 2017.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that known because it's not appearing in user code, I think. But yeah, it's nice to not copy-paste same tailRecM across all your types :)

final override def pure[A](a: A): IO[A] =
IO.pure(a)
final override def unit: IO[Unit] =
IO.unit

final override def map[A, B](fa: IO[A])(f: A => B): IO[B] =
fa.map(f)
final override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] =
ioa.flatMap(f)

final override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] =
ioa.attempt
final override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] =
ioa.handleErrorWith(f)
final override def raiseError[A](e: Throwable): IO[A] =
IO.raiseError(e)

final override def bracket[A, B](acquire: IO[A])
(use: A => IO[B])
(release: A => IO[Unit]): IO[B] =
acquire.bracket(use)(release)

final override def bracketCase[A, B](acquire: IO[A])
(use: A => IO[B])
(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] =
acquire.bracketCase(use)(release)

final override def delay[A](thunk: => A): IO[A] =
IO(thunk)
final override def suspend[A](thunk: => IO[A]): IO[A] =
IO.suspend(thunk)
final override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] =
IO.async(k)
override def liftIO[A](ioa: IO[A]): IO[A] =
ioa
override def toIO[A](fa: IO[A]): IO[A] =
fa
final override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
ioa.runAsync(cb)
final override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] =
ioa.runSyncStep
}
}

private[effect] abstract class IOInstances extends IOLowPriorityInstances {

implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] {
implicit def parApplicative(implicit timer: Timer[IO]): Applicative[IO.Par] = new Applicative[IO.Par] {
import IO.Par.{unwrap, apply => par}

override def pure[A](x: A): IO.Par[A] =
final override def pure[A](x: A): IO.Par[A] =
par(IO.pure(x))
override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] =
par(IOParMap(unwrap(fa), unwrap(fb))(f))
override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] =
final override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] =
par(IOParMap(timer, unwrap(fa), unwrap(fb))(f))
final override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] =
map2(ff, fa)(_(_))
override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] =
final override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] =
map2(fa, fb)((_, _))
override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] =
final override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] =
par(unwrap(fa).map(f))
override def unit: IO.Par[Unit] =
final override def unit: IO.Par[Unit] =
par(IO.unit)
}

implicit val ioConcurrentEffect: ConcurrentEffect[IO] = new ConcurrentEffect[IO] {
override def pure[A](a: A): IO[A] =
IO.pure(a)
override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] =
ioa.flatMap(f)
override def map[A, B](fa: IO[A])(f: A => B): IO[B] =
fa.map(f)
override def delay[A](thunk: => A): IO[A] =
IO(thunk)
override def unit: IO[Unit] =
IO.unit
override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] =
ioa.attempt
override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] =
ioa.handleErrorWith(f)
override def raiseError[A](e: Throwable): IO[A] =
IO.raiseError(e)
override def suspend[A](thunk: => IO[A]): IO[A] =
IO.suspend(thunk)
override def start[A](fa: IO[A]): IO[Fiber[IO, A]] =
implicit def ioConcurrentEffect(implicit timer: Timer[IO]): ConcurrentEffect[IO] = new IOEffect with ConcurrentEffect[IO] {
final override def start[A](fa: IO[A]): IO[Fiber[IO, A]] =
fa.start
override def uncancelable[A](fa: IO[A]): IO[A] =
final override def uncancelable[A](fa: IO[A]): IO[A] =
fa.uncancelable
override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] =
final override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] =
fa.onCancelRaiseError(e)
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] =
IO.async(k)
override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] =

final override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] =
IO.race(fa, fb)
override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
final override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IO.racePair(fa, fb)
override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
ioa.runAsync(cb)
override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] =
ioa.runSyncStep
override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] =

final override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] =
IO.cancelable(k)
override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
final override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
fa.runCancelable(cb)
override def toIO[A](fa: IO[A]): IO[A] =
fa
override def liftIO[A](ioa: IO[A]): IO[A] =
ioa
// this will use stack proportional to the maximum number of joined async suspensions
override def tailRecM[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] =
f(a).flatMap {
case Left(a) => tailRecM(a)(f)
case Right(b) => pure(b)
}
override def bracket[A, B](acquire: IO[A])
(use: A => IO[B])
(release: A => IO[Unit]): IO[B] =
acquire.bracket(use)(release)
override def bracketCase[A, B](acquire: IO[A])
(use: A => IO[B])
(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] =
acquire.bracketCase(use)(release)

final override def toIO[A](fa: IO[A]): IO[A] = fa
final override def liftIO[A](ioa: IO[A]): IO[A] = ioa
}

implicit val ioParallel: Parallel[IO, IO.Par] =
implicit def ioParallel(implicit timer: Timer[IO]): Parallel[IO, IO.Par] =
new Parallel[IO, IO.Par] {
override def applicative: Applicative[IO.Par] =
parApplicative
override def monad: Monad[IO] =
ioConcurrentEffect
override val sequential: ~>[IO.Par, IO] =
final override val applicative: Applicative[IO.Par] =
parApplicative(timer)
final override val monad: Monad[IO] =
ioConcurrentEffect(timer)

final override val sequential: ~>[IO.Par, IO] =
new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) }
override val parallel: ~>[IO, IO.Par] =
final override val parallel: ~>[IO, IO.Par] =
new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = IO.Par(fa) }
}

implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] {
def empty = IO.pure(Monoid[A].empty)
final override def empty: IO[A] = IO.pure(Monoid[A].empty)
}

implicit val ioSemigroupK: SemigroupK[IO] = new SemigroupK[IO] {
def combineK[A](a: IO[A], b: IO[A]): IO[A] =
final override def combineK[A](a: IO[A], b: IO[A]): IO[A] =
ApplicativeError[IO, Throwable].handleErrorWith(a)(_ => b)
}
}
Expand Down Expand Up @@ -1171,8 +1178,8 @@ object IO extends IOInstances {
* Also see [[racePair]] for a version that does not cancel
* the loser automatically on successful results.
*/
def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] =
IORace.simple(lh, rh)
def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] =
IORace.simple(timer, lh, rh)

/**
* Run two IO tasks concurrently, and returns a pair
Expand Down Expand Up @@ -1202,8 +1209,8 @@ object IO extends IOInstances {
* See [[race]] for a simpler version that cancels the loser
* immediately.
*/
def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IORace.pair(lh, rh)
def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IORace.pair(timer, lh, rh)

private[effect] final case class Pure[+A](a: A)
extends IO[A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ private[effect] trait IOBinaryCompat[+A] { self: IO[A] =>
}

private[effect] trait IOCompanionBinaryCompat {
/**
* DEPRECATED — name and type changed in [[IO.ioConcurrentEffect]].
*
* This old variant is kept in order to keep binary compatibility
* until 1.0 — when it will be removed completely.
*/
@deprecated("Renamed to ioConcurrentEffect", "0.10")
private[internals] def ioEffect: Effect[IO] = IO.ioConcurrentEffect

/**
* DEPRECATED — the `ec` parameter is gone.
*
Expand Down
13 changes: 8 additions & 5 deletions core/shared/src/main/scala/cats/effect/internals/IOParMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@

package cats.effect.internals

import cats.effect.IO
import cats.syntax.apply._
import cats.effect.{IO, Timer}
import cats.effect.internals.Callback.Extensions

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.ExecutionContext

private[effect] object IOParMap {
import Callback.{Type => Callback}

/** Implementation for `parMap2`. */
def apply[A, B, C](fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] =
def apply[A, B, C](timer: Timer[IO], fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] =
IO.Async { (conn, cb) =>
// For preventing stack-overflow errors; using a
// trampolined execution context, so no thread forks
Expand Down Expand Up @@ -54,8 +56,8 @@ private[effect] object IOParMap {
// NOTE: conn.pop() happens when cb gets called!
conn.push(() => Cancelable.cancelAll(connA.cancel, connB.cancel))

IORunLoop.startCancelable(fa, connA, callbackA(connB))
IORunLoop.startCancelable(fb, connB, callbackB(connA))
IORunLoop.startCancelable(timer.shift *> fa, connA, callbackA(connB))
IORunLoop.startCancelable(timer.shift *> fb, connB, callbackB(connA))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to shift both? Isn’t shifting just fa enough?

}

/** Callback for the left task. */
Expand Down Expand Up @@ -98,7 +100,7 @@ private[effect] object IOParMap {
}

/** Called when an error is generated. */
def sendError(other: IOConnection, e: Throwable): Unit =
def sendError(other: IOConnection, e: Throwable): Unit = {
state.getAndSet(e) match {
case _: Throwable =>
Logger.reportFailure(e)
Expand All @@ -107,6 +109,7 @@ private[effect] object IOParMap {
try other.cancel() finally
cb.async(conn, Left(e))
}
}
})
}
}
13 changes: 7 additions & 6 deletions core/shared/src/main/scala/cats/effect/internals/IORace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package internals

import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.Promise
import cats.syntax.apply._
import cats.effect.internals.Callback.{Type => Callback}
import cats.effect.internals.Callback.Extensions

Expand All @@ -28,7 +29,7 @@ private[effect] object IORace {
* but this way it is more efficient, as we no longer have to keep
* internal promises.
*/
def simple[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] = {
def simple[A, B](timer: Timer[IO], lh: IO[A], rh: IO[B]): IO[Either[A, B]] = {
// Signals successful results
def onSuccess[T, U](
isActive: AtomicBoolean,
Expand Down Expand Up @@ -65,15 +66,15 @@ private[effect] object IORace {
val connR = IOConnection()

// Starts concurrent execution for the left value
IORunLoop.startCancelable[A](lh, connL, {
IORunLoop.startCancelable[A](timer.shift *> lh, connL, {
case Right(a) =>
onSuccess(active, connR, cb, Left(a))
case Left(err) =>
onError(active, cb, connR, err)
})

// Starts concurrent execution for the right value
IORunLoop.startCancelable[B](rh, connR, {
IORunLoop.startCancelable[B](timer.shift *> rh, connR, {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn’t shifting just the left side enough?

Copy link
Contributor Author

@oleg-py oleg-py May 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be enough, but I'm not sure (my understanding of concurrency and internals is still a little bit off 😅 ), and I also didn't want to introduce a behavior (right-hand side start immediately on the current thread) which would break the symmetry between IO.race(fa, fb) and IO.race(fb, fa).map(_.swap).

And if optimizing it away would not be a breaking change, we can always do it later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Also if you fork one thread, then execute the other immediately, it's not really a fair race 🙂

I think that we'll also be able to do an optimization like I introduced in Monix, where we can detect tasks that are forked for the obvious cases. For example we can detect shift *> delay(f) and prevent an extra shift.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... such optimizations are dirty though, I wouldn't do it in this PR. I'd rather merge the PR ASAP, because @mpilquist needs it in FS2.

case Right(b) =>
onSuccess(active, connL, cb, Right(b))
case Left(err) =>
Expand All @@ -88,7 +89,7 @@ private[effect] object IORace {
/**
* Implementation for `IO.racePair`
*/
def pair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = {
def pair[A, B](timer: Timer[IO], lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = {
IO.cancelable { cb =>
val active = new AtomicBoolean(true)
// Cancelable connection for the left value
Expand All @@ -99,7 +100,7 @@ private[effect] object IORace {
val promiseR = Promise[Either[Throwable, B]]()

// Starts concurrent execution for the left value
IORunLoop.startCancelable[A](lh, connL, {
IORunLoop.startCancelable[A](timer.shift *> lh, connL, {
case Right(a) =>
if (active.getAndSet(false))
cb.async(Right(Left((a, IOFiber.build[B](promiseR, connR)))))
Expand All @@ -116,7 +117,7 @@ private[effect] object IORace {
})

// Starts concurrent execution for the right value
IORunLoop.startCancelable[B](rh, connR, {
IORunLoop.startCancelable[B](timer.shift *> rh, connR, {
case Right(b) =>
if (active.getAndSet(false))
cb.async(Right(Right((IOFiber.build[A](promiseL, connL), b))))
Expand Down
Loading