Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require Timer and shift automatically in concurrent IO operations #232

Merged
merged 6 commits into from
May 23, 2018

Conversation

oleg-py
Copy link
Contributor

@oleg-py oleg-py commented May 19, 2018

Fixes #229.

val fakeErr = new PrintStream(outStream)
System.setErr(fakeErr)

Future.fromTry(Try(thunk)).flatten.andThen {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flatten isn't available on 2.11 unless you import cats syntax :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will fix when I get to code :)

@mpilquist mpilquist self-requested a review May 19, 2018 14:59
def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]] =
IORace.simple(lh, rh)
def race[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[A, B]] =
IORace.simple(timer.shift *> lh, timer.shift *> rh)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just take timer, lh, and rh directly on simple and delay or even avoid the allocation of *>

final def start: IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(this)
final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(timer.shift *> this)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just make IOStart(timer, this) so if we never actually evaluatue the result we save some allocations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 agree with @johnynek

def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IORace.pair(lh, rh)
def racePair[A, B](lh: IO[A], rh: IO[B])(implicit timer: Timer[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]] =
IORace.pair(timer.shift *> lh, timer.shift *> rh)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment about just taking the timer here vs manually shifting.

final def start: IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(this)
final def start(implicit timer: Timer[IO]): IO[Fiber[IO, A @uncheckedVariance]] =
IOStart(timer.shift *> this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 agree with @johnynek

@@ -674,17 +671,63 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype
}

implicit def ioSemigroup[A: Semigroup]: Semigroup[IO[A]] = new IOSemigroup[A]

implicit val ioEffect: Effect[IO] = new IOEffect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing about this is that we get to keep the old ioEffect, with the previous type.


implicit val ioEffect: Effect[IO] = new IOEffect

private[effect] class IOEffect extends Effect[IO] with StackSafeMonad[IO] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WTF I did not know there's a StackSafeMonad in Cats since Jun 20, 2017.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that known because it's not appearing in user code, I think. But yeah, it's nice to not copy-paste same tailRecM across all your types :)

override def unit: IO[Unit] =
IO.unit

override def map[A, B](fa: IO[A])(f: A => B): IO[B] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would mark all of these defs as final.

Supposedly it helps the JVM optimize code. Never seen concrete evidence of it, but unless you want to leave the door open for overrides, there's no point in keeping them non-final.

import IO.Par.{unwrap, apply => par}

override def pure[A](x: A): IO.Par[A] =
par(IO.pure(x))
override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] =
par(IOParMap(unwrap(fa), unwrap(fb))(f))
par(IOParMap(timer.shift *> unwrap(fa), timer.shift *> unwrap(fb))(f))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to deploy the same trick I did in Monix for detecting already forked tasks.

But it doesn't have to happen in this PR, plus it probably needs good validation that it works in Monix first 🙂

@oleg-py
Copy link
Contributor Author

oleg-py commented May 20, 2018

@alexandru there's a problem with MVar tests on JS with auto-shifting, can you take a look? 😄

IORunLoop.startCancelable(fa, connA, callbackA(connB))
IORunLoop.startCancelable(fb, connB, callbackB(connA))
IORunLoop.startCancelable(timer.shift *> fa, connA, callbackA(connB))
IORunLoop.startCancelable(timer.shift *> fb, connB, callbackB(connA))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to shift both? Isn’t shifting just fa enough?

case Right(a) =>
onSuccess(active, connR, cb, Left(a))
case Left(err) =>
onError(active, cb, connR, err)
})

// Starts concurrent execution for the right value
IORunLoop.startCancelable[B](rh, connR, {
IORunLoop.startCancelable[B](timer.shift *> rh, connR, {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn’t shifting just the left side enough?

Copy link
Contributor Author

@oleg-py oleg-py May 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be enough, but I'm not sure (my understanding of concurrency and internals is still a little bit off 😅 ), and I also didn't want to introduce a behavior (right-hand side start immediately on the current thread) which would break the symmetry between IO.race(fa, fb) and IO.race(fb, fa).map(_.swap).

And if optimizing it away would not be a breaking change, we can always do it later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Also if you fork one thread, then execute the other immediately, it's not really a fair race 🙂

I think that we'll also be able to do an optimization like I introduced in Monix, where we can detect tasks that are forked for the obvious cases. For example we can detect shift *> delay(f) and prevent an extra shift.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... such optimizations are dirty though, I wouldn't do it in this PR. I'd rather merge the PR ASAP, because @mpilquist needs it in FS2.


class IOAppTests extends AsyncFunSuite with Matchers with BeforeAndAfterAll with TestUtils {
test("exits with specified code") {
IOAppPlatform.mainFiber(Array.empty, Eval.now(implicitly))(_ => IO.pure(ExitCode(42)))
.flatMap(_.join)
.unsafeToFuture
.value shouldEqual (Some(Success(42)))
.map(_ shouldEqual 42)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn’t this be an onComplete or something? If we get an exception will this be called?

Also, does the test framework just automatically handle Future? Why don’t we need to Await?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test framework does. You return a Future[Assertion], and this is what I'm creating with map. A failed Future gets reported as a failed test.

@@ -1277,11 +1260,11 @@ There is also `cats.Traverse.sequence` which does this synchronously.

### parTraverse

If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parTraverse` to run the steps in parallel. The IO tasks must be asynchronous, which if they are not you can use shift.
If you have a list of data and a way of turning each item into an IO, but you want a single IO for the results you can use `parSequence` to run the steps in parallel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parTraverse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing 👍

oleg-py added 2 commits May 22, 2018 16:23
- Reverted misfix in documentation. Added note on creating Timer[IO]s.
- Fixed failing MVar tests
- Moved timer.shift from IO.start deeper to IOStart
@codecov-io
Copy link

Codecov Report

Merging #232 into master will increase coverage by 0.92%.
The diff coverage is 95.74%.

@@            Coverage Diff             @@
##           master     #232      +/-   ##
==========================================
+ Coverage   87.81%   88.73%   +0.92%     
==========================================
  Files          60       58       -2     
  Lines        1494     1492       -2     
  Branches      148      145       -3     
==========================================
+ Hits         1312     1324      +12     
+ Misses        182      168      -14

@alexandru alexandru added this to the 1.0.0-RC2 milestone May 23, 2018
@alexandru alexandru merged commit 8ed6e71 into typelevel:master May 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants