-
Notifications
You must be signed in to change notification settings - Fork 535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Require Timer and shift automatically in concurrent IO operations #232
Require Timer and shift automatically in concurrent IO operations #232
Conversation
val fakeErr = new PrintStream(outStream) | ||
System.setErr(fakeErr) | ||
|
||
Future.fromTry(Try(thunk)).flatten.andThen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flatten isn't available on 2.11 unless you import cats syntax :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will fix when I get to code :)
def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] = | ||
IORace.simple(lh, rh) | ||
def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] = | ||
IORace.simple(timer.shift *> lh, timer.shift *> rh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should just take timer, lh, and rh directly on simple and delay or even avoid the allocation of *>
final def start: IO[Fiber[IO, A @uncheckedVariance]] = | ||
IOStart(this) | ||
final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] = | ||
IOStart(timer.shift *> this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we just make IOStart(timer, this)
so if we never actually evaluatue the result we save some allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 agree with @johnynek
def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = | ||
IORace.pair(lh, rh) | ||
def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] = | ||
IORace.pair(timer.shift *> lh, timer.shift *> rh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment about just taking the timer here vs manually shifting.
final def start: IO[Fiber[IO, A @uncheckedVariance]] = | ||
IOStart(this) | ||
final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] = | ||
IOStart(timer.shift *> this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 agree with @johnynek
@@ -674,17 +671,63 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype | |||
} | |||
|
|||
implicit def ioSemigroup[A: Semigroup]: Semigroup[IO[A]] = new IOSemigroup[A] | |||
|
|||
implicit val ioEffect: Effect[IO] = new IOEffect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nice thing about this is that we get to keep the old ioEffect
, with the previous type.
|
||
implicit val ioEffect: Effect[IO] = new IOEffect | ||
|
||
private[effect] class IOEffect extends Effect[IO] with StackSafeMonad[IO] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WTF I did not know there's a StackSafeMonad
in Cats since Jun 20, 2017.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that known because it's not appearing in user code, I think. But yeah, it's nice to not copy-paste same tailRecM
across all your types :)
override def unit: IO[Unit] = | ||
IO.unit | ||
|
||
override def map[A, B](fa: IO[A])(f: A => B): IO[B] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would mark all of these defs as final
.
Supposedly it helps the JVM optimize code. Never seen concrete evidence of it, but unless you want to leave the door open for overrides, there's no point in keeping them non-final.
import IO.Par.{unwrap, apply => par} | ||
|
||
override def pure[A](x: A): IO.Par[A] = | ||
par(IO.pure(x)) | ||
override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = | ||
par(IOParMap(unwrap(fa), unwrap(fb))(f)) | ||
par(IOParMap(timer.shift *> unwrap(fa), timer.shift *> unwrap(fb))(f)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to deploy the same trick I did in Monix for detecting already forked tasks.
But it doesn't have to happen in this PR, plus it probably needs good validation that it works in Monix first 🙂
@alexandru there's a problem with MVar tests on JS with auto-shifting, can you take a look? 😄 |
IORunLoop.startCancelable(fa, connA, callbackA(connB)) | ||
IORunLoop.startCancelable(fb, connB, callbackB(connA)) | ||
IORunLoop.startCancelable(timer.shift *> fa, connA, callbackA(connB)) | ||
IORunLoop.startCancelable(timer.shift *> fb, connB, callbackB(connA)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to shift both? Isn’t shifting just fa enough?
case Right(a) => | ||
onSuccess(active, connR, cb, Left(a)) | ||
case Left(err) => | ||
onError(active, cb, connR, err) | ||
}) | ||
|
||
// Starts concurrent execution for the right value | ||
IORunLoop.startCancelable[B](rh, connR, { | ||
IORunLoop.startCancelable[B](timer.shift *> rh, connR, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn’t shifting just the left side enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be enough, but I'm not sure (my understanding of concurrency and internals is still a little bit off 😅 ), and I also didn't want to introduce a behavior (right-hand side start immediately on the current thread) which would break the symmetry between IO.race(fa, fb)
and IO.race(fb, fa).map(_.swap)
.
And if optimizing it away would not be a breaking change, we can always do it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Also if you fork one thread, then execute the other immediately, it's not really a fair race 🙂
I think that we'll also be able to do an optimization like I introduced in Monix, where we can detect tasks that are forked for the obvious cases. For example we can detect shift *> delay(f)
and prevent an extra shift
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... such optimizations are dirty though, I wouldn't do it in this PR. I'd rather merge the PR ASAP, because @mpilquist needs it in FS2.
|
||
class IOAppTests extends AsyncFunSuite with Matchers with BeforeAndAfterAll with TestUtils { | ||
test("exits with specified code") { | ||
IOAppPlatform.mainFiber(Array.empty, Eval.now(implicitly))(_ => IO.pure(ExitCode(42))) | ||
.flatMap(_.join) | ||
.unsafeToFuture | ||
.value shouldEqual (Some(Success(42))) | ||
.map(_ shouldEqual 42) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn’t this be an onComplete or something? If we get an exception will this be called?
Also, does the test framework just automatically handle Future? Why don’t we need to Await?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test framework does. You return a Future[Assertion]
, and this is what I'm creating with map
. A failed Future
gets reported as a failed test.
site/src/main/tut/datatypes/io.md
Outdated
@@ -1277,11 +1260,11 @@ There is also `cats.Traverse.sequence` which does this synchronously. | |||
|
|||
### parTraverse | |||
|
|||
If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parTraverse` to run the steps in parallel. The IO tasks must be asynchronous, which if they are not you can use shift. | |||
If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parSequence` to run the steps in parallel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parTraverse
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for noticing 👍
- Reverted misfix in documentation. Added note on creating Timer[IO]s. - Fixed failing MVar tests - Moved timer.shift from IO.start deeper to IOStart
Codecov Report
@@ Coverage Diff @@
## master #232 +/- ##
==========================================
+ Coverage 87.81% 88.73% +0.92%
==========================================
Files 60 58 -2
Lines 1494 1492 -2
Branches 148 145 -3
==========================================
+ Hits 1312 1324 +12
+ Misses 182 168 -14 |
Fixes #229.