Skip to content

Commit

Permalink
Merge pull request #95 from fraktalio/feature/arrow-effects
Browse files Browse the repository at this point in the history
Arrow (from `either` to `Effect`)
  • Loading branch information
idugalic authored Jun 4, 2022
2 parents 27c5a5e + 6969dd7 commit 7b71739
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 141 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ Notice that `Decider` implements an interface `IDecider` to communicate the cont

- with identity element `Decider<Nothing?, Unit, Nothing?>`

> A monoid is a type together with a binary operation (combine) over that type, satisfying associativity and having an identity/empty element.
> Associativity facilitates parallelization by giving us the freedom to break problems into chunks that can be computed in parallel.
> A monoid is a type together with a binary operation (combine) over that type, satisfying associativity and having an
> identity/empty element.
> Associativity facilitates parallelization by giving us the freedom to break problems into chunks that can be computed
> in parallel.

We can now construct event-sourcing or/and state-storing aggregate by using the same `decider`.
Expand Down Expand Up @@ -318,8 +320,10 @@ Notice that `View` implements an interface `IView` to communicate the contract.
- `View<in Si, out So, in E?>.combine(y: View<in Si2, out So2, in E2?>): View<Pair<Si, Si2>, Pair<So, So2>, E_SUPER>`
- with identity element `View<Unit, Nothing?>`

> A monoid is a type together with a binary operation (combine) over that type, satisfying associativity and having an identity/empty element.
> Associativity facilitates parallelization by giving us the freedom to break problems into chunks that can be computed in parallel.
> A monoid is a type together with a binary operation (combine) over that type, satisfying associativity and having an
> identity/empty element.
> Associativity facilitates parallelization by giving us the freedom to break problems into chunks that can be computed
> in parallel.
We can now construct `materialized` view by using this `view`.

Expand Down Expand Up @@ -446,7 +450,8 @@ private fun <C, E> CoroutineScope.commandActor(
}
```

> [Actors](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html) are marked as @ObsoleteCoroutinesApi by Kotlin at the moment.
> [Actors](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html)
> are marked as @ObsoleteCoroutinesApi by Kotlin at the moment.
## Kotlin

Expand Down
3 changes: 3 additions & 0 deletions application-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ kotlin {
nativeTarget.compilations.all {
kotlinOptions.verbose = true
}
nativeTarget.binaries.all {
freeCompilerArgs += "-Xlazy-ir-for-caches=disable"
}

sourceSets {
val commonMain by getting {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.fraktalio.fmodel.application

import arrow.core.Either
import arrow.core.Either.Left
import arrow.core.Either.Right
import arrow.core.continuations.Effect
import arrow.core.continuations.effect
import com.fraktalio.fmodel.application.Error.CommandHandlingFailed
import com.fraktalio.fmodel.application.Error.CommandPublishingFailed
import kotlinx.coroutines.FlowPreview
Expand All @@ -31,55 +30,53 @@ import kotlinx.coroutines.flow.map
* Extension function - Handles the command message of type [C]
*
* @param command Command message of type [C]
* @return [Flow] of [Either] [Error] or Events of type [E]
* @return [Flow] of [Effect] (either [Error] or Events of type [E])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleEither(command: C): Flow<Either<Error, E>> =
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleWithEffect(command: C): Flow<Effect<Error, E>> =
command
.fetchEvents()
.computeNewEvents(command)
.save()
.map { Right(it) }
.catch<Either<Error, E>> {
emit(Left(CommandHandlingFailed(command)))
}
.map { effect<Error, E> { it } }
.catch { emit(effect { shift(CommandHandlingFailed(command)) }) }

/**
* Extension function - Handles the flow of command messages of type [C]
*
* @param commands [Flow] of Command messages of type [C]
* @return [Flow] of [Either] [Error] or Events of type [E]
* @return [Flow] of [Effect] (either [Error] or Events of type [E])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleEither(commands: Flow<C>): Flow<Either<Error, E>> =
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleWithEffect(commands: Flow<C>): Flow<Effect<Error, E>> =
commands
.flatMapConcat { handleEither(it) }
.catch { emit(Left(CommandPublishingFailed(it))) }
.flatMapConcat { handleWithEffect(it) }
.catch { emit(effect { shift(CommandPublishingFailed(it)) }) }

/**
* Extension function - Publishes the command of type [C] to the event sourcing aggregate of type [EventSourcingAggregate]<[C], *, [E]>
* @receiver command of type [C]
* @param aggregate of type [EventSourcingAggregate]<[C], *, [E]>
* @return the [Flow] of [Either] [Error] or successfully stored Events of type [E]
* @return the [Flow] of [Effect] (either [Error] or successfully stored Events of type [E])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, E> C.publishEitherTo(aggregate: EventSourcingAggregate<C, *, E>): Flow<Either<Error, E>> =
aggregate.handleEither(this)
fun <C, E> C.publishWithEffect(aggregate: EventSourcingAggregate<C, *, E>): Flow<Effect<Error, E>> =
aggregate.handleWithEffect(this)

/**
* Extension function - Publishes [Flow] of commands of type [C] to the event sourcing aggregate of type [EventSourcingAggregate]<[C], *, [E]>
* @receiver [Flow] of commands of type [C]
* @param aggregate of type [EventSourcingAggregate]<[C], *, [E]>
* @return the [Flow] of [Either] [Error] or successfully stored Events of type [E]
* @return the [Flow] of [Effect] (either [Error] or successfully stored Events of type [E])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, E> Flow<C>.publishEitherTo(aggregate: EventSourcingAggregate<C, *, E>): Flow<Either<Error, E>> =
aggregate.handleEither(this)
fun <C, E> Flow<C>.publishWithEffect(aggregate: EventSourcingAggregate<C, *, E>): Flow<Effect<Error, E>> =
aggregate.handleWithEffect(this)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package com.fraktalio.fmodel.application

import arrow.core.Either
import arrow.core.Either.Companion.catch
import arrow.core.Either.Left
import arrow.core.computations.either
import arrow.core.continuations.Effect
import arrow.core.continuations.effect
import arrow.core.nonFatalOrThrow
import com.fraktalio.fmodel.application.Error.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
Expand All @@ -29,84 +28,95 @@ import kotlinx.coroutines.flow.map
* Extension function - Handles the event of type [E]
*
* @param event Event of type [E] to be handled
* @return [Either] [Error] or State of type [S]
* @return [Effect] (either [Error] or State of type [S])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
suspend fun <S, E> MaterializedView<S, E>.handleEither(event: E): Either<Error, S> {
suspend fun <S, E> MaterializedView<S, E>.handleWithEffect(event: E): Effect<Error, S> {
/**
* Inner function - Computes new State based on the Event or fails.
*
* @param event of type [E]
* @return The newly computed state of type [S] or [Error]
*/
fun S?.eitherComputeNewStateOrFail(event: E): Either<Error, S> =
catch {
computeNewState(event)
}.mapLeft { throwable -> CalculatingNewViewStateFailed(this, event, throwable) }
fun S?.computeNewStateWithEffect(event: E): Effect<Error, S> =
effect {
try {
computeNewState(event)
} catch (t: Throwable) {
shift(CalculatingNewViewStateFailed(this@computeNewStateWithEffect, event, t.nonFatalOrThrow()))
}
}

/**
* Inner function - Fetch state - either version
*
* @receiver Event of type [E]
* @return [Either] [Error] or the State of type [S]?
* @return [Effect] (either [Error] or the State of type [S]?)
*/
suspend fun E.eitherFetchStateOrFail(): Either<FetchingViewStateFailed<E>, S?> =
catch {
fetchState()
}.mapLeft { throwable -> FetchingViewStateFailed(this, throwable) }
suspend fun E.fetchStateWithEffect(): Effect<Error, S?> =
effect {
try {
fetchState()
} catch (t: Throwable) {
shift(FetchingViewStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow()))
}
}

/**
* Inner function - Save state - either version
*
* @receiver State of type [S]
* @return [Either] [Error] or the newly saved State of type [S]
* @return [Effect] (either [Error] or the newly saved State of type [S])
*/
suspend fun S.eitherSaveOrFail(): Either<StoringStateFailed<S>, S> =
catch {
this.save()
}.mapLeft { throwable -> StoringStateFailed(this, throwable) }
suspend fun S.saveWithEffect(): Effect<Error, S> =
effect {
try {
save()
} catch (t: Throwable) {
shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow()))
}
}

// Arrow provides a Monad instance for Either. Except for the types signatures, our program remains unchanged when we compute over Either. All values on the left side assume to be Right biased and, whenever a Left value is found, the computation short-circuits, producing a result that is compatible with the function type signature.
return either {
event.eitherFetchStateOrFail().bind()
.eitherComputeNewStateOrFail(event).bind()
.eitherSaveOrFail().bind()
return effect {
event.fetchStateWithEffect().bind()
.computeNewStateWithEffect(event).bind()
.saveWithEffect().bind()
}
}

/**
* Extension function - Handles the flow of events of type [E]
*
* @param events Flow of Events of type [E] to be handled
* @return [Flow] of [Either] [Error] or State of type [S]
* @return [Flow] of [Effect] (either [Error] or State of type [S])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <S, E> MaterializedView<S, E>.handleEither(events: Flow<E>): Flow<Either<Error, S>> =
fun <S, E> MaterializedView<S, E>.handleWithEffect(events: Flow<E>): Flow<Effect<Error, S>> =
events
.map { handleEither(it) }
.catch { emit(Left(EventPublishingFailed(it))) }
.map { handleWithEffect(it) }
.catch { emit(effect { shift(EventPublishingFailed(it)) }) }


/**
* Extension function - Publishes the event of type [E] to the materialized view of type [MaterializedView]<[S], [E]>
* @receiver event of type [E]
* @param materializedView of type [MaterializedView]<[S], [E]>
* @return [Either] [Error] or the successfully stored State of type [S]
* @return [Effect] (either [Error] or the successfully stored State of type [S])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
suspend fun <S, E> E.publishEitherTo(materializedView: MaterializedView<S, E>): Either<Error, S> =
materializedView.handleEither(this)
suspend fun <S, E> E.publishWithEffect(materializedView: MaterializedView<S, E>): Effect<Error, S> =
materializedView.handleWithEffect(this)

/**
* Extension function - Publishes the event of type [E] to the materialized view of type [MaterializedView]<[S], [E]>
* @receiver [Flow] of events of type [E]
* @param materializedView of type [MaterializedView]<[S], [E]>
* @return [Flow] of [Either] [Error] or the successfully stored State of type [S]
* @return [Flow] of [Effect] (either [Error] or the successfully stored State of type [S])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <S, E> Flow<E>.publishEitherTo(materializedView: MaterializedView<S, E>): Flow<Either<Error, S>> =
materializedView.handleEither(this)
fun <S, E> Flow<E>.publishWithEffect(materializedView: MaterializedView<S, E>): Flow<Effect<Error, S>> =
materializedView.handleWithEffect(this)
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.fraktalio.fmodel.application

import arrow.core.Either
import arrow.core.Either.Left
import arrow.core.Either.Right
import arrow.core.continuations.Effect
import arrow.core.continuations.effect
import com.fraktalio.fmodel.application.Error.ActionResultHandlingFailed
import com.fraktalio.fmodel.application.Error.ActionResultPublishingFailed
import kotlinx.coroutines.FlowPreview
Expand All @@ -31,51 +30,52 @@ import kotlinx.coroutines.flow.map
* Extension function - Handles the action result of type [AR].
*
* @param actionResult Action Result represent the outcome of some action you want to handle in some way
* @return [Flow] of [Either] [Error] or Actions of type [A]
* @return [Flow] of [Effect] (either [Error] or Actions of type [A])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <AR, A> SagaManager<AR, A>.handleEither(actionResult: AR): Flow<Either<Error, A>> =
fun <AR, A> SagaManager<AR, A>.handleWithEffect(actionResult: AR): Flow<Effect<Error, A>> =
actionResult
.computeNewActions()
.publish()
.map { Right(it) }
.catch<Either<Error, A>> { emit(Left(ActionResultHandlingFailed(actionResult))) }
.map { effect<Error, A> { it } }
.catch { emit(effect { shift(ActionResultHandlingFailed(actionResult)) }) }

/**
* Extension function - Handles the the [Flow] of action results of type [AR].
* Extension function - Handles the [Flow] of action results of type [AR].
*
* @param actionResults Action Results represent the outcome of some action you want to handle in some way
* @return [Flow] of [Either] [Error] or Actions of type [A]
* @return [Flow] of [Effect] (either [Error] or Actions of type [A])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <AR, A> SagaManager<AR, A>.handleEither(actionResults: Flow<AR>): Flow<Either<Error, A>> =
fun <AR, A> SagaManager<AR, A>.handleWithEffect(actionResults: Flow<AR>): Flow<Effect<Error, A>> =
actionResults
.flatMapConcat { handleEither(it) }
.catch { emit(Left(ActionResultPublishingFailed(it))) }
.flatMapConcat { handleWithEffect(it) }
.catch { emit(effect { shift(ActionResultPublishingFailed(it)) }) }


/**
* Extension function - Publishes the action result of type [AR] to the saga manager of type [SagaManager]<[AR], [A]>
* @receiver action result of type [AR]
* @param sagaManager of type [SagaManager]<[AR], [A]>
* @return the [Flow] of [Either] [Error] or successfully published Actions of type [A]
* @return the [Flow] of [Effect] (either [Error] or successfully published Actions of type [A])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
fun <AR, A> AR.publishEitherTo(sagaManager: SagaManager<AR, A>): Flow<Either<Error, A>> = sagaManager.handleEither(this)
fun <AR, A> AR.publishWithEffect(sagaManager: SagaManager<AR, A>): Flow<Effect<Error, A>> =
sagaManager.handleWithEffect(this)

/**
* Extension function - Publishes the action result of type [AR] to the saga manager of type [SagaManager]<[AR], [A]>
* @receiver [Flow] of action results of type [AR]
* @param sagaManager of type [SagaManager]<[AR], [A]>
* @return the [Flow] of [Either] [Error] or successfully published Actions of type [A]
* @return the [Flow] of [Effect] (either [Error] or successfully published Actions of type [A])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <AR, A> Flow<AR>.publishEitherTo(sagaManager: SagaManager<AR, A>): Flow<Either<Error, A>> =
sagaManager.handleEither(this)
fun <AR, A> Flow<AR>.publishWithEffect(sagaManager: SagaManager<AR, A>): Flow<Effect<Error, A>> =
sagaManager.handleWithEffect(this)

Loading

0 comments on commit 7b71739

Please sign in to comment.