Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move uncancelable down to Bracket. Add laws specifying interaction. #241

Merged
merged 10 commits into from
May 28, 2018
47 changes: 47 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,53 @@ val mimaSettings = Seq(
exclude[ReversedMissingMethodProblem]("cats.effect.Effect.toIO"),
exclude[ReversedMissingMethodProblem]("cats.effect.ConcurrentEffect.toIO"),

// Uncancelable moved down to Bracket
exclude[DirectMissingMethodProblem]("cats.effect.Concurrent#Ops.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[UpdateForwarderBodyProblem]("cats.effect.Concurrent#WriterTConcurrent.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[ReversedMissingMethodProblem]("cats.effect.Sync#OptionTSync.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[ReversedMissingMethodProblem]("cats.effect.Sync#WriterTSync.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[UpdateForwarderBodyProblem]("cats.effect.Concurrent#EitherTConcurrent.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[UpdateForwarderBodyProblem]("cats.effect.Concurrent#OptionTConcurrent.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[ReversedMissingMethodProblem]("cats.effect.Sync#StateTSync.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[ReversedMissingMethodProblem]("cats.effect.Sync#EitherTSync.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.uncancelable"),
exclude[UpdateForwarderBodyProblem]("cats.effect.Concurrent#StateTConcurrent.uncancelable"),
exclude[InheritedNewAbstractMethodProblem]("cats.effect.Bracket.guarantee"),

// Require Timer[IO] for auto-shifting now
exclude[DirectMissingMethodProblem]("cats.effect.IO.start"),
Expand Down
24 changes: 24 additions & 0 deletions core/shared/src/main/scala/cats/effect/Bracket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ trait Bracket[F[_], E] extends MonadError[F, E] {
(release: A => F[Unit]): F[B] =
bracketCase(acquire)(use)((a, _) => release(a))

/**
* Operation meant for ensuring a given task continues execution even
* when interrupted.
*
* For example, this equivalence holds
*
* {{{
* F.uncancelable { F.bracketCase(F.unit)(_ => fa) { case Cancelled(_) => action; case _ => action2 } } <-> F.ensuring(fa)(action2)
* }}}
*/
def uncancelable[A](fa: F[A]): F[A] =
bracket(fa)(pure)(_ => unit)

/**
* Operation meant for specifying tasks with specific logic having guaranteed to
* execute whenever source task has completed, failed with an exception, or cancelled.
*
* A special case of [[bracket]], which is not meant for resource acquisition
*/
def guarantee[A](fa: F[A])(finalizer: F[Unit]): F[A] =
bracket(unit)(_ => fa)(_ => finalizer)
}

/**
Expand Down Expand Up @@ -186,5 +207,8 @@ object Bracket {
}
}
}

override def uncancelable[A](fa: Kleisli[F, R, A]): Kleisli[F, R, A] =
Kleisli { r => F.uncancelable(fa.run(r)) }
}
}
90 changes: 37 additions & 53 deletions core/shared/src/main/scala/cats/effect/Concurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,43 @@ import scala.util.Either
* without exposing and forcing the user to work with cancellation
* tokens. An [[Async]] data type cannot expose for example a `start`
* operation that is safe.
*
* == Resource-safety ==
* [[Concurrent]] data type is also required to cooperate with [[Bracket]]:
*
*
* For `uncancelable`, the [[Fiber.cancel cancel]] signal has no effect on the
* result of [[Fiber.join join]] and that the cancelable token returned by
* [[ConcurrentEffect.runCancelable]] on evaluation will have no effect.
*
* So `uncancelable` must undo the cancellation mechanism of [[Concurrent!.cancelable cancelable]],
* with this equivalence:
*
* {{{
* F.uncancelable(F.cancelable { cb => f(cb); io }) <-> F.async(f)
* }}}
*
* Sample:
*
* {{{
* val F = Concurrent[IO]
* val timer = Timer[IO]
*
* // Normally Timer#sleep yields cancelable tasks
* val tick = F.uncancelable(timer.sleep(10.seconds))
*
* // This prints "Tick!" after 10 seconds, even if we are
* // canceling the Fiber after start:
* for {
* fiber <- F.start(tick)
* _ <- fiber.cancel
* _ <- fiber.join
* _ <- F.delay { println("Tick!") }
* } yield ()
* }}}
*
* When doing [[Bracket.bracket bracket]] or [[Bracket.bracketCase bracketCase]],
* `acquire` and `release` operations are guaranteed to be uncancelable as well.
*/
@typeclass
@implicitNotFound("""Cannot find implicit value for Concurrent[${F}].
Expand Down Expand Up @@ -181,47 +218,6 @@ trait Concurrent[F[_]] extends Async[F] {
*/
def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): F[A]

/**
* Returns a new `F` that mirrors the source, but that is uninterruptible.
*
* This means that the [[Fiber.cancel cancel]] signal has no effect on the
* result of [[Fiber.join join]] and that the cancelable token returned by
* [[ConcurrentEffect.runCancelable]] on evaluation will have no effect.
*
* This operation is undoing the cancellation mechanism of [[cancelable]],
* with this equivalence:
*
* {{{
* F.uncancelable(F.cancelable { cb => f(cb); io }) <-> F.async(f)
* }}}
*
* Sample:
*
* {{{
* val F = Concurrent[IO]
* val timer = Timer[IO]
*
* // Normally Timer#sleep yields cancelable tasks
* val tick = F.uncancelable(timer.sleep(10.seconds))
*
* // This prints "Tick!" after 10 seconds, even if we are
* // canceling the Fiber after start:
* for {
* fiber <- F.start(tick)
* _ <- fiber.cancel
* _ <- fiber.join
* } yield {
* println("Tick!")
* }
* }}}
*
* Cancelable effects are great in race conditions, however sometimes
* this operation is necessary to ensure that the bind continuation
* of a task (the following `flatMap` operations) are also evaluated
* no matter what.
*/
def uncancelable[A](fa: F[A]): F[A]

/**
* Returns a new `F` value that mirrors the source for normal
* termination, but that triggers the given error on cancellation.
Expand Down Expand Up @@ -487,8 +483,6 @@ object Concurrent {
def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): EitherT[F, L, A] =
EitherT.liftF(F.cancelable(k))(F)

def uncancelable[A](fa: EitherT[F, L, A]): EitherT[F, L, A] =
EitherT(F.uncancelable(fa.value))

def onCancelRaiseError[A](fa: EitherT[F, L, A], e: Throwable): EitherT[F, L, A] =
EitherT(F.onCancelRaiseError(fa.value, e))
Expand Down Expand Up @@ -552,9 +546,6 @@ object Concurrent {
}
})

def uncancelable[A](fa: OptionT[F, A]): OptionT[F, A] =
OptionT(F.uncancelable(fa.value))

def onCancelRaiseError[A](fa: OptionT[F, A], e: Throwable): OptionT[F, A] =
OptionT(F.onCancelRaiseError(fa.value, e))

Expand Down Expand Up @@ -588,8 +579,6 @@ object Concurrent {
}
}

def uncancelable[A](fa: StateT[F, S, A]): StateT[F, S, A] =
fa.transformF(F.uncancelable)

def onCancelRaiseError[A](fa: StateT[F, S, A], e: Throwable): StateT[F, S, A] =
fa.transformF(F.onCancelRaiseError(_, e))
Expand All @@ -611,9 +600,6 @@ object Concurrent {
def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): WriterT[F, L, A] =
WriterT.liftF(F.cancelable(k))(L, F)

def uncancelable[A](fa: WriterT[F, L, A]): WriterT[F, L, A] =
WriterT(F.uncancelable(fa.run))

def onCancelRaiseError[A](fa: WriterT[F, L, A], e: Throwable): WriterT[F, L, A] =
WriterT(F.onCancelRaiseError(fa.run, e))

Expand Down Expand Up @@ -646,8 +632,6 @@ object Concurrent {
override def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): Kleisli[F, R, A] =
Kleisli.liftF(F.cancelable(k))

override def uncancelable[A](fa: Kleisli[F, R, A]): Kleisli[F, R, A] =
Kleisli { r => F.suspend(F.uncancelable(fa.run(r))) }

override def onCancelRaiseError[A](fa: Kleisli[F, R, A], e: Throwable): ReaderT[F, R, A] =
Kleisli { r => F.suspend(F.onCancelRaiseError(fa.run(r), e)) }
Expand Down
6 changes: 4 additions & 2 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,9 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype
(release: A => IO[Unit]): IO[B] =
acquire.bracket(use)(release)

final override def uncancelable[A](task: IO[A]): IO[A] =
task.uncancelable

final override def bracketCase[A, B](acquire: IO[A])
(use: A => IO[B])
(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] =
Expand Down Expand Up @@ -741,8 +744,7 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances {
implicit def ioConcurrentEffect(implicit timer: Timer[IO]): ConcurrentEffect[IO] = new IOEffect with ConcurrentEffect[IO] {
final override def start[A](fa: IO[A]): IO[Fiber[IO, A]] =
fa.start
final override def uncancelable[A](fa: IO[A]): IO[A] =
fa.uncancelable

final override def onCancelRaiseError[A](fa: IO[A], e: Throwable): IO[A] =
fa.onCancelRaiseError(e)

Expand Down
15 changes: 15 additions & 0 deletions core/shared/src/main/scala/cats/effect/Sync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ object Sync {

def suspend[A](thunk: => EitherT[F, L, A]): EitherT[F, L, A] =
EitherT(F.suspend(thunk.value))

override def uncancelable[A](fa: EitherT[F, L, A]): EitherT[F, L, A] =
EitherT(F.uncancelable(fa.value))
}

private[effect] trait OptionTSync[F[_]] extends Sync[OptionT[F, ?]] {
Expand Down Expand Up @@ -156,6 +159,9 @@ object Sync {

def suspend[A](thunk: => OptionT[F, A]): OptionT[F, A] =
OptionT(F.suspend(thunk.value))

override def uncancelable[A](fa: OptionT[F, A]): OptionT[F, A] =
OptionT(F.uncancelable(fa.value))
}

private[effect] trait StateTSync[F[_], S] extends Sync[StateT[F, S, ?]] {
Expand All @@ -182,6 +188,9 @@ object Sync {
}
}

override def uncancelable[A](fa: StateT[F, S, A]): StateT[F, S, A] =
fa.transformF(F.uncancelable)

def flatMap[A, B](fa: StateT[F, S, A])(f: A => StateT[F, S, B]): StateT[F, S, B] =
fa.flatMap(f)

Expand Down Expand Up @@ -218,6 +227,9 @@ object Sync {
}
}

override def uncancelable[A](fa: WriterT[F, L, A]): WriterT[F, L, A] =
WriterT(F.uncancelable(fa.run))

def flatMap[A, B](fa: WriterT[F, L, A])(f: A => WriterT[F, L, B]): WriterT[F, L, B] =
fa.flatMap(f)

Expand All @@ -242,5 +254,8 @@ object Sync {

def suspend[A](thunk: => Kleisli[F, R, A]): Kleisli[F, R, A] =
Kleisli(r => F.suspend(thunk.run(r)))

override def uncancelable[A](fa: Kleisli[F, R, A]): Kleisli[F, R, A] =
Kleisli { r => F.suspend(F.uncancelable(fa.run(r))) }
}
}
16 changes: 14 additions & 2 deletions laws/shared/src/main/scala/cats/effect/laws/BracketLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ trait BracketLaws[F[_], E] extends MonadErrorLaws[F, E] {
def bracketCaseWithPureUnitIsEqvMap[A, B](fa: F[A], f: A => B) =
F.bracketCase(fa)(a => f(a).pure[F])((_, _) => F.unit) <-> F.map(fa)(f)

def bracketCaseWithPureUnitIsEqvFlatMap[A, B](fa: F[A], f: A => F[B]) =
F.bracketCase(fa)(f)((_, _) => F.unit) <-> F.flatMap(fa)(f)
def bracketCaseWithPureUnitIsUncancelable[A, B](fa: F[A], f: A => F[B]) =
F.bracketCase(fa)(f)((_, _) => F.unit) <-> F.uncancelable(fa).flatMap(f)

def bracketCaseFailureInAcquisitionRemainsFailure[A, B](e: E, f: A => F[B], release: F[Unit]) =
F.bracketCase(F.raiseError[A](e))(f)((_, _) => release) <-> F.raiseError(e)
Expand All @@ -38,6 +38,18 @@ trait BracketLaws[F[_], E] extends MonadErrorLaws[F, E] {

def bracketIsDerivedFromBracketCase[A, B](fa: F[A], use: A => F[B], release: A => F[Unit]) =
F.bracket(fa)(use)(release) <-> F.bracketCase(fa)(use)((a, _) => release(a))

def uncancelablePreventsCanceledCase[A](fa: F[A], onCancel: F[Unit], onFinish: F[Unit]) =
F.uncancelable(F.bracketCase(F.unit)(_ => fa) {
case (_, ExitCase.Canceled(_)) => onCancel
case _ => onFinish
}) <-> F.uncancelable(F.guarantee(fa)(onFinish))

def acquireAndReleaseAreUncancelable[A, B](fa: F[A], use: A => F[B], release: A => F[Unit]) =
F.bracket(F.uncancelable(fa))(use)(a => F.uncancelable(release(a))) <-> F.bracket(fa)(use)(release)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have canceled F[A] in our arbitraries to make this a more interesting law?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canceled F[A]s cannot be made uncancelable :) We need F[A]s that might get concurrently cancelled or not, which is a plenty of work that probably should be done later.


def guaranteeIsDerivedFromBracket[A](fa: F[A], finalizer: F[Unit]) =
F.guarantee(fa)(finalizer) <-> F.bracket(F.unit)(_ => fa)(_ => finalizer)
}

object BracketLaws {
Expand Down
30 changes: 29 additions & 1 deletion laws/shared/src/main/scala/cats/effect/laws/ConcurrentLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package cats.effect
package laws

import cats.effect.concurrent.Deferred
import cats.effect.concurrent.{Deferred, MVar}
import cats.laws._
import cats.syntax.all._
import scala.Predef.{identity => id}
Expand Down Expand Up @@ -90,6 +90,34 @@ trait ConcurrentLaws[F[_]] extends AsyncLaws[F] {
lh <-> F.async(_ => ())
}

def acquireIsNotCancelable[A, B](fa: F[A], b1: B, b2: B) = {
val lh =
for {
mVar <- F.liftIO(MVar[IO].of(b1))
task = F.bracket(F.liftIO(mVar.put(b2)))(_ => F.unit)(_ => F.unit)
fiber <- F.start(task)
_ <- fiber.cancel
_ <- F.liftIO(mVar.take)
out <- F.liftIO(mVar.take)
} yield out

lh <-> F.pure(b2)
}

def releaseIsNotCancelable[A, B](fa: F[A], b1: B, b2: B) = {
val lh =
for {
mVar <- F.liftIO(MVar[IO].of(b1))
task = F.bracket(F.unit)(_ => F.never[B])(_ => F.liftIO(mVar.put(b2)))
fiber <- F.start(task)
_ <- fiber.cancel
_ <- F.liftIO(mVar.take)
out <- F.liftIO(mVar.take)
} yield out

lh <-> F.pure(b2)
}

def onCancelRaiseErrorMirrorsSource[A](fa: F[A], e: Throwable) = {
F.onCancelRaiseError(fa, e) <-> fa
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ trait BracketTests[F[_], E] extends MonadErrorTests[F, E] {
"bracketCase with pure unit on release is eqv to map" -> forAll(laws.bracketCaseWithPureUnitIsEqvMap[A, B] _),
"bracketCase with failure in use and release is use" -> forAll(laws.bracketCaseEmitsUseFailure[A] _),
"bracketCase with failure in acquisition remains failure" -> forAll(laws.bracketCaseFailureInAcquisitionRemainsFailure[A, B] _),
"bracketCase with pure unit on release is eqv to flatMap" -> forAll(laws.bracketCaseWithPureUnitIsEqvFlatMap[A, B] _),
"bracket is derived from bracketCase" -> forAll(laws.bracketIsDerivedFromBracketCase[A, B] _)
"bracketCase with pure unit on release is eqv to uncancelable(..).flatMap" -> forAll(laws.bracketCaseWithPureUnitIsUncancelable[A, B] _),
"bracket is derived from bracketCase" -> forAll(laws.bracketIsDerivedFromBracketCase[A, B] _),
"uncancelable prevents Cancelled case" -> forAll(laws.uncancelablePreventsCanceledCase[A] _),
"acquire and release of bracket are uncancelable" -> forAll(laws.acquireAndReleaseAreUncancelable[A, B] _),
"guarantee is derived from bracket" -> forAll(laws.guaranteeIsDerivedFromBracket[A] _)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ trait ConcurrentTests[F[_]] extends AsyncTests[F] {
"start.flatMap(_.cancel) is unit" -> forAll(laws.startCancelIsUnit[A] _),
"uncancelable mirrors source" -> forAll(laws.uncancelableMirrorsSource[A] _),
"uncancelable prevents cancellation" -> forAll(laws.uncancelablePreventsCancelation[A] _),
"acquire of bracket is not cancellable" -> forAll(laws.acquireIsNotCancelable[A, B] _),
"release of bracket is not cancellable" -> forAll(laws.releaseIsNotCancelable[A, B] _),
"onCancelRaiseError mirrors source" -> forAll(laws.onCancelRaiseErrorMirrorsSource[A] _),
"onCancelRaiseError terminates on cancel" -> forAll(laws.onCancelRaiseErrorTerminatesOnCancel[A] _),
"onCancelRaiseError can cancel source" -> forAll(laws.onCancelRaiseErrorCanCancelSource[A] _),
Expand Down