-
Notifications
You must be signed in to change notification settings - Fork 535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Require Timer and shift automatically in concurrent IO operations #232
Changes from 4 commits
0934ef3
f07fcee
1729f6b
5107d19
a3b9e3b
010d6d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ package cats | |
package effect | ||
|
||
import cats.arrow.FunctionK | ||
import cats.syntax.apply._ | ||
import cats.effect.internals.Callback.Extensions | ||
import cats.effect.internals._ | ||
import cats.effect.internals.TrampolineEC.immediate | ||
|
@@ -364,13 +365,9 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { | |
* consider in the example above what would happen if the first task | ||
* finishes in error. In that case the second task doesn't get canceled, | ||
* which creates a potential memory leak. | ||
* | ||
* IMPORTANT — this operation does not start with an asynchronous boundary. | ||
* But you can use [[IO.shift(implicit* IO.shift]] to force an async | ||
* boundary just before start. | ||
*/ | ||
final def start: IO[Fiber[IO, A @uncheckedVariance]] = | ||
IOStart(this) | ||
final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] = | ||
IOStart(timer.shift *> this) | ||
|
||
/** | ||
* Returns a new `IO` that mirrors the source task for normal termination, | ||
|
@@ -674,104 +671,114 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype | |
} | ||
|
||
implicit def ioSemigroup[A: Semigroup]: Semigroup[IO[A]] = new IOSemigroup[A] | ||
|
||
implicit val ioEffect: Effect[IO] = new IOEffect | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The nice thing about this is that we get to keep the old |
||
|
||
private[effect] class IOEffect extends Effect[IO] with StackSafeMonad[IO] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WTF I did not know there's a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not that known because it's not appearing in user code, I think. But yeah, it's nice to not copy-paste same |
||
final override def pure[A](a: A): IO[A] = | ||
IO.pure(a) | ||
final override def unit: IO[Unit] = | ||
IO.unit | ||
|
||
final override def map[A, B](fa: IO[A])(f: A => B): IO[B] = | ||
fa.map(f) | ||
final override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] = | ||
ioa.flatMap(f) | ||
|
||
final override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] = | ||
ioa.attempt | ||
final override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] = | ||
ioa.handleErrorWith(f) | ||
final override def raiseError[A](e: Throwable): IO[A] = | ||
IO.raiseError(e) | ||
|
||
final override def bracket[A, B](acquire: IO[A]) | ||
(use: A => IO[B]) | ||
(release: A => IO[Unit]): IO[B] = | ||
acquire.bracket(use)(release) | ||
|
||
final override def bracketCase[A, B](acquire: IO[A]) | ||
(use: A => IO[B]) | ||
(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] = | ||
acquire.bracketCase(use)(release) | ||
|
||
final override def delay[A](thunk: => A): IO[A] = | ||
IO(thunk) | ||
final override def suspend[A](thunk: => IO[A]): IO[A] = | ||
IO.suspend(thunk) | ||
final override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = | ||
IO.async(k) | ||
override def liftIO[A](ioa: IO[A]): IO[A] = | ||
ioa | ||
override def toIO[A](fa: IO[A]): IO[A] = | ||
fa | ||
final override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = | ||
ioa.runAsync(cb) | ||
final override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] = | ||
ioa.runSyncStep | ||
} | ||
} | ||
|
||
private[effect] abstract class IOInstances extends IOLowPriorityInstances { | ||
|
||
implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] { | ||
implicit def parApplicative(implicit timer: Timer[IO]): Applicative[IO.Par] = new Applicative[IO.Par] { | ||
import IO.Par.{unwrap, apply => par} | ||
|
||
override def pure[A](x: A): IO.Par[A] = | ||
final override def pure[A](x: A): IO.Par[A] = | ||
par(IO.pure(x)) | ||
override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = | ||
par(IOParMap(unwrap(fa), unwrap(fb))(f)) | ||
override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = | ||
final override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = | ||
par(IOParMap(timer, unwrap(fa), unwrap(fb))(f)) | ||
final override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = | ||
map2(ff, fa)(_(_)) | ||
override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = | ||
final override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = | ||
map2(fa, fb)((_, _)) | ||
override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] = | ||
final override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] = | ||
par(unwrap(fa).map(f)) | ||
override def unit: IO.Par[Unit] = | ||
final override def unit: IO.Par[Unit] = | ||
par(IO.unit) | ||
} | ||
|
||
implicit val ioConcurrentEffect: ConcurrentEffect[IO] = new ConcurrentEffect[IO] { | ||
override def pure[A](a: A): IO[A] = | ||
IO.pure(a) | ||
override def flatMap[A, B](ioa: IO[A])(f: A => IO[B]): IO[B] = | ||
ioa.flatMap(f) | ||
override def map[A, B](fa: IO[A])(f: A => B): IO[B] = | ||
fa.map(f) | ||
override def delay[A](thunk: => A): IO[A] = | ||
IO(thunk) | ||
override def unit: IO[Unit] = | ||
IO.unit | ||
override def attempt[A](ioa: IO[A]): IO[Either[Throwable, A]] = | ||
ioa.attempt | ||
override def handleErrorWith[A](ioa: IO[A])(f: Throwable => IO[A]): IO[A] = | ||
ioa.handleErrorWith(f) | ||
override def raiseError[A](e: Throwable): IO[A] = | ||
IO.raiseError(e) | ||
override def suspend[A](thunk: => IO[A]): IO[A] = | ||
IO.suspend(thunk) | ||
override def start[A](fa: IO[A]): IO[Fiber[IO, A]] = | ||
implicit def ioConcurrentEffect(implicit timer: Timer[IO]): ConcurrentEffect[IO] = new IOEffect with ConcurrentEffect[IO] { | ||
final override def start[A](fa: IO[A]): IO[Fiber[IO, A]] = | ||
fa.start | ||
override def uncancelable[A](fa: IO[A]): IO[A] = | ||
final override def uncancelable[A](fa: IO[A]): IO[A] = | ||
fa.uncancelable | ||
override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] = | ||
final override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] = | ||
fa.onCancelRaiseError(e) | ||
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = | ||
IO.async(k) | ||
override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] = | ||
|
||
final override def race[A, B](fa: IO[A], fb: IO[B]): IO[Either[A, B]] = | ||
IO.race(fa, fb) | ||
override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = | ||
final override def racePair[A, B](fa: IO[A], fb: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = | ||
IO.racePair(fa, fb) | ||
override def runAsync[A](ioa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = | ||
ioa.runAsync(cb) | ||
override def runSyncStep[A](ioa: IO[A]): IO[Either[IO[A], A]] = | ||
ioa.runSyncStep | ||
override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = | ||
|
||
final override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = | ||
IO.cancelable(k) | ||
override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] = | ||
final override def runCancelable[A](fa: IO[A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] = | ||
fa.runCancelable(cb) | ||
override def toIO[A](fa: IO[A]): IO[A] = | ||
fa | ||
override def liftIO[A](ioa: IO[A]): IO[A] = | ||
ioa | ||
// this will use stack proportional to the maximum number of joined async suspensions | ||
override def tailRecM[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = | ||
f(a).flatMap { | ||
case Left(a) => tailRecM(a)(f) | ||
case Right(b) => pure(b) | ||
} | ||
override def bracket[A, B](acquire: IO[A]) | ||
(use: A => IO[B]) | ||
(release: A => IO[Unit]): IO[B] = | ||
acquire.bracket(use)(release) | ||
override def bracketCase[A, B](acquire: IO[A]) | ||
(use: A => IO[B]) | ||
(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] = | ||
acquire.bracketCase(use)(release) | ||
|
||
final override def toIO[A](fa: IO[A]): IO[A] = fa | ||
final override def liftIO[A](ioa: IO[A]): IO[A] = ioa | ||
} | ||
|
||
implicit val ioParallel: Parallel[IO, IO.Par] = | ||
implicit def ioParallel(implicit timer: Timer[IO]): Parallel[IO, IO.Par] = | ||
new Parallel[IO, IO.Par] { | ||
override def applicative: Applicative[IO.Par] = | ||
parApplicative | ||
override def monad: Monad[IO] = | ||
ioConcurrentEffect | ||
override val sequential: ~>[IO.Par, IO] = | ||
final override val applicative: Applicative[IO.Par] = | ||
parApplicative(timer) | ||
final override val monad: Monad[IO] = | ||
ioConcurrentEffect(timer) | ||
|
||
final override val sequential: ~>[IO.Par, IO] = | ||
new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) } | ||
override val parallel: ~>[IO, IO.Par] = | ||
final override val parallel: ~>[IO, IO.Par] = | ||
new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = IO.Par(fa) } | ||
} | ||
|
||
implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] { | ||
def empty = IO.pure(Monoid[A].empty) | ||
final override def empty: IO[A] = IO.pure(Monoid[A].empty) | ||
} | ||
|
||
implicit val ioSemigroupK: SemigroupK[IO] = new SemigroupK[IO] { | ||
def combineK[A](a: IO[A], b: IO[A]): IO[A] = | ||
final override def combineK[A](a: IO[A], b: IO[A]): IO[A] = | ||
ApplicativeError[IO, Throwable].handleErrorWith(a)(_ => b) | ||
} | ||
} | ||
|
@@ -1171,8 +1178,8 @@ object IO extends IOInstances { | |
* Also see [[racePair]] for a version that does not cancel | ||
* the loser automatically on successful results. | ||
*/ | ||
def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] = | ||
IORace.simple(lh, rh) | ||
def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] = | ||
IORace.simple(timer, lh, rh) | ||
|
||
/** | ||
* Run two IO tasks concurrently, and returns a pair | ||
|
@@ -1202,8 +1209,8 @@ object IO extends IOInstances { | |
* See [[race]] for a simpler version that cancels the loser | ||
* immediately. | ||
*/ | ||
def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = | ||
IORace.pair(lh, rh) | ||
def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = | ||
IORace.pair(timer, lh, rh) | ||
|
||
private[effect] final case class Pure[+A](a: A) | ||
extends IO[A] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,16 +16,18 @@ | |
|
||
package cats.effect.internals | ||
|
||
import cats.effect.IO | ||
import cats.syntax.apply._ | ||
import cats.effect.{IO, Timer} | ||
import cats.effect.internals.Callback.Extensions | ||
|
||
import java.util.concurrent.atomic.AtomicReference | ||
import scala.concurrent.ExecutionContext | ||
|
||
private[effect] object IOParMap { | ||
import Callback.{Type => Callback} | ||
|
||
/** Implementation for `parMap2`. */ | ||
def apply[A, B, C](fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = | ||
def apply[A, B, C](timer: Timer[IO], fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = | ||
IO.Async { (conn, cb) => | ||
// For preventing stack-overflow errors; using a | ||
// trampolined execution context, so no thread forks | ||
|
@@ -54,8 +56,8 @@ private[effect] object IOParMap { | |
// NOTE: conn.pop() happens when cb gets called! | ||
conn.push(() => Cancelable.cancelAll(connA.cancel, connB.cancel)) | ||
|
||
IORunLoop.startCancelable(fa, connA, callbackA(connB)) | ||
IORunLoop.startCancelable(fb, connB, callbackB(connA)) | ||
IORunLoop.startCancelable(timer.shift *> fa, connA, callbackA(connB)) | ||
IORunLoop.startCancelable(timer.shift *> fb, connB, callbackB(connA)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to shift both? Isn’t shifting just fa enough? |
||
} | ||
|
||
/** Callback for the left task. */ | ||
|
@@ -98,7 +100,7 @@ private[effect] object IOParMap { | |
} | ||
|
||
/** Called when an error is generated. */ | ||
def sendError(other: IOConnection, e: Throwable): Unit = | ||
def sendError(other: IOConnection, e: Throwable): Unit = { | ||
state.getAndSet(e) match { | ||
case _: Throwable => | ||
Logger.reportFailure(e) | ||
|
@@ -107,6 +109,7 @@ private[effect] object IOParMap { | |
try other.cancel() finally | ||
cb.async(conn, Left(e)) | ||
} | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package internals | |
|
||
import java.util.concurrent.atomic.AtomicBoolean | ||
import scala.concurrent.Promise | ||
import cats.syntax.apply._ | ||
import cats.effect.internals.Callback.{Type => Callback} | ||
import cats.effect.internals.Callback.Extensions | ||
|
||
|
@@ -28,7 +29,7 @@ private[effect] object IORace { | |
* but this way it is more efficient, as we no longer have to keep | ||
* internal promises. | ||
*/ | ||
def simple[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] = { | ||
def simple[A, B](timer: Timer[IO], lh: IO[A], rh: IO[B]): IO[Either[A, B]] = { | ||
// Signals successful results | ||
def onSuccess[T, U]( | ||
isActive: AtomicBoolean, | ||
|
@@ -65,15 +66,15 @@ private[effect] object IORace { | |
val connR = IOConnection() | ||
|
||
// Starts concurrent execution for the left value | ||
IORunLoop.startCancelable[A](lh, connL, { | ||
IORunLoop.startCancelable[A](timer.shift *> lh, connL, { | ||
case Right(a) => | ||
onSuccess(active, connR, cb, Left(a)) | ||
case Left(err) => | ||
onError(active, cb, connR, err) | ||
}) | ||
|
||
// Starts concurrent execution for the right value | ||
IORunLoop.startCancelable[B](rh, connR, { | ||
IORunLoop.startCancelable[B](timer.shift *> rh, connR, { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why isn’t shifting just the left side enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be enough, but I'm not sure (my understanding of concurrency and internals is still a little bit off 😅 ), and I also didn't want to introduce a behavior (right-hand side start immediately on the current thread) which would break the symmetry between And if optimizing it away would not be a breaking change, we can always do it later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Also if you fork one thread, then execute the other immediately, it's not really a fair race 🙂 I think that we'll also be able to do an optimization like I introduced in Monix, where we can detect tasks that are forked for the obvious cases. For example we can detect There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... such optimizations are dirty though, I wouldn't do it in this PR. I'd rather merge the PR ASAP, because @mpilquist needs it in FS2. |
||
case Right(b) => | ||
onSuccess(active, connL, cb, Right(b)) | ||
case Left(err) => | ||
|
@@ -88,7 +89,7 @@ private[effect] object IORace { | |
/** | ||
* Implementation for `IO.racePair` | ||
*/ | ||
def pair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = { | ||
def pair[A, B](timer: Timer[IO], lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = { | ||
IO.cancelable { cb => | ||
val active = new AtomicBoolean(true) | ||
// Cancelable connection for the left value | ||
|
@@ -99,7 +100,7 @@ private[effect] object IORace { | |
val promiseR = Promise[Either[Throwable, B]]() | ||
|
||
// Starts concurrent execution for the left value | ||
IORunLoop.startCancelable[A](lh, connL, { | ||
IORunLoop.startCancelable[A](timer.shift *> lh, connL, { | ||
case Right(a) => | ||
if (active.getAndSet(false)) | ||
cb.async(Right(Left((a, IOFiber.build[B](promiseR, connR))))) | ||
|
@@ -116,7 +117,7 @@ private[effect] object IORace { | |
}) | ||
|
||
// Starts concurrent execution for the right value | ||
IORunLoop.startCancelable[B](rh, connR, { | ||
IORunLoop.startCancelable[B](timer.shift *> rh, connR, { | ||
case Right(b) => | ||
if (active.getAndSet(false)) | ||
cb.async(Right(Right((IOFiber.build[A](promiseL, connL), b)))) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we just make
IOStart(timer, this)
so if we never actually evaluatue the result we save some allocations.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 agree with @johnynek