Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Automated event handler invocation
Browse files Browse the repository at this point in the history
- applications don't need to call onEvent in a persist handler anymore
- closes #93

Further changes:

- command handler can now be defined as override def onCommand = ...
- event handler can now be defined as override def onEvent = ...
- snapshot handler can now be defined as override def onSnapshot = ...
  • Loading branch information
krasserm committed Dec 7, 2015
1 parent 278f0cc commit dfb35b5
Show file tree
Hide file tree
Showing 34 changed files with 331 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import scala.util._

object EventsourcedActorCausalitySpec {
class Collaborator(val id: String, val eventLog: ActorRef, override val sharedClockEntry: Boolean, handles: Set[String], probe: ActorRef) extends EventsourcedActor {
val onCommand: Receive = {
def onCommand = {
case s: String => persist(s) {
case Success(e) => onEvent(e)
case Success(e) =>
case Failure(e) => throw e
}
}

val onEvent: Receive = {
def onEvent = {
case s: String if handles.contains(s) =>
probe ! ((s, lastVectorTimestamp, currentVectorTime))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ object EventsourcedActorIntegrationSpec {
case class Cmd(payloads: String*)
case class State(state: Vector[String])

class SampleActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
override val onCommand: Receive = {
class ReplyActor(val id: String, val eventLog: ActorRef) extends EventsourcedActor {
override def onCommand = {
case "reply-success" => persist("okay") {
case Success(r) => sender() ! r
case Failure(_) => sender() ! "unexpected failure"
Expand All @@ -40,6 +40,15 @@ object EventsourcedActorIntegrationSpec {
case Success(_) => sender() ! "unexpected success"
case Failure(e) => sender() ! e.getMessage
}
}

override def onEvent = {
case evt: String =>
}
}

class BatchActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
override def onCommand = {
case "boom" =>
throw boom
case Cmd(ps @ _*) =>
Expand All @@ -51,62 +60,62 @@ object EventsourcedActorIntegrationSpec {
}
}

override val onEvent: Receive = {
case evt: String => probe ! evt
override def onEvent = {
case evt: String => if (recovering) probe ! evt
}
}

class AccActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
var acc: Vector[String] = Vector.empty

override val onCommand: Receive = {
override def onCommand = {
case "get-acc" => sender() ! acc
case s: String => persist(s) {
case Success(r) => onEvent(r)
case Success(r) =>
case Failure(e) => throw e
}
}

override val onEvent: Receive = {
override def onEvent = {
case s: String =>
acc = acc :+ s
if (acc.size == 4) probe ! acc
}
}

class ConfirmedDeliveryActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor with ConfirmedDelivery {
override val onCommand: Receive = {
override def onCommand = {
case "boom" => throw boom
case "end" => probe ! "end"
case "cmd-1" => persist("evt-1")(_ => probe ! "out-1")
case "cmd-2" => persist("evt-2")(r => onEvent(r.get))
case "cmd-2-confirm" => persist("evt-2-confirm")(r => onEvent(r.get))
case "cmd-2" => persist("evt-2")(r => ())
case "cmd-2-confirm" => persist("evt-2-confirm")(r => ())
}

override val onEvent: Receive = {
override def onEvent = {
case "evt-2" => deliver("2", "out-2", probe.path)
case "evt-2-confirm" => confirm("2")
}
}

class ConditionalActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor with ConditionalRequests {
override val onCommand: Receive = {
override def onCommand = {
case "persist" => persist("a")(r => probe ! r.get)
case "persist-mute" => persist("a")(_ => ())
case other => probe ! other
}

override val onEvent: Receive = {
override def onEvent = {
case "a" =>
}
}

class ConditionalView(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedView with ConditionalRequests {
override val onCommand: Receive = {
override def onCommand = {
case other => probe ! other
}

override val onEvent: Receive = {
override def onEvent = {
case "a" =>
}
}
Expand All @@ -117,13 +126,13 @@ object EventsourcedActorIntegrationSpec {
class CollabActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
var initialized = false

override val onCommand: Receive = {
override def onCommand = {
case CollabCmd(to) =>
persist(CollabEvt(to, id))(_ => ())
initialized = true
}

override val onEvent: Receive = {
override def onEvent = {
case evt @ CollabEvt(`id`, from) =>
if (initialized) probe ! lastVectorTimestamp else self ! CollabCmd(from)
}
Expand All @@ -136,41 +145,41 @@ object EventsourcedActorIntegrationSpec {
override val eventLog: ActorRef,
probe: ActorRef) extends EventsourcedActor {

override val onCommand: Receive = {
override def onCommand = {
case Route(s, destinations) => persist(s, destinations) {
case Success(s) => onEvent(s)
case Success(s) =>
case Failure(e) => throw e
}
}

override val onEvent: Receive = {
override def onEvent = {
case s: String => probe ! s
}
}

class SnapshotActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
var state: Vector[String] = Vector.empty

override val onCommand: Receive = {
override def onCommand = {
case "boom" =>
throw boom
case "snap" => save(State(state)) {
case Success(m) => sender() ! m.sequenceNr
case Failure(e) => throw e
}
case s: String => persist(s) {
case Success(r) => onEvent(r)
case Success(r) =>
case Failure(e) => throw e
}
}

override val onEvent: Receive = {
override def onEvent = {
case s: String =>
state = state :+ s
probe ! state
}

override val onSnapshot: Receive = {
override def onSnapshot = {
case State(s) =>
this.state = s
probe ! state
Expand All @@ -180,7 +189,7 @@ object EventsourcedActorIntegrationSpec {
class SnapshotView(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedView {
var state: Vector[String] = Vector.empty

override val onCommand: Receive = {
override def onCommand = {
case "boom" =>
throw boom
case "snap" => save(State(state)) {
Expand All @@ -189,13 +198,13 @@ object EventsourcedActorIntegrationSpec {
}
}

override val onEvent: Receive = {
override def onEvent = {
case s: String =>
state = state :+ s"v-${s}"
probe ! state
}

override val onSnapshot: Receive = {
override def onSnapshot = {
case State(s) =>
this.state = s
probe ! state
Expand All @@ -207,19 +216,19 @@ object EventsourcedActorIntegrationSpec {

override def replayChunkSizeMax: Int = 2

override val onCommand: Receive = {
override def onCommand = {
case "boom" =>
throw boom
case "state" =>
probe ! state
case s: String =>
persist(s) {
case Success(r) => onEvent(r)
case Success(r) =>
case Failure(e) => throw e
}
}

override val onEvent: Receive = {
override def onEvent = {
case s: String => state = state :+ s
}
}
Expand All @@ -240,17 +249,18 @@ abstract class EventsourcedActorIntegrationSpec extends TestKit(ActorSystem("tes

"An EventsourcedActor" can {
"preserve the command sender when invoking the persist handler on success" in {
val actor = system.actorOf(Props(new SampleActor("1", log, probe.ref)))
val actor = system.actorOf(Props(new ReplyActor("1", log)))
actor.tell("reply-success", probe.ref)
probe.expectMsg("okay")
}
"preserve the command sender when invoking the persist handler on failure" in {
val actor = system.actorOf(Props(new SampleActor("1", log, probe.ref)))
val sdrProbe = TestProbe()
val actor = system.actorOf(Props(new ReplyActor("1", log)))
actor.tell("reply-failure", probe.ref)
probe.expectMsg("boom")
}
"persist multiple events per command as atomic batch" in {
val actor = system.actorOf(Props(new SampleActor("1", log, probe.ref)))
val actor = system.actorOf(Props(new BatchActor("1", log, probe.ref)))
actor.tell(Cmd("a", "boom", "c"), probe.ref)
probe.expectMsg("boom")
probe.expectMsg("boom")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ object EventsourcedActorThroughputSpec {
var stopTime: Long = 0L
var num: Int = 0

val onCommand: Receive = {
def onCommand = {
case "stats" =>
probe ! s"${(1000.0 * 1000 * 1000 * num) / (stopTime - startTime) } events/sec"
case s: String => persist(s) {
case Success(e) => onEvent(e)
case Success(e) =>
case Failure(e) => throw e
}
}

val onEvent: Receive = {
def onEvent = {
case "start" =>
startTime = System.nanoTime()
case "stop" =>
Expand All @@ -54,15 +54,15 @@ object EventsourcedActorThroughputSpec {
}

class Writer2(val id: String, val eventLog: ActorRef, collector: ActorRef) extends EventsourcedActor {
val onCommand: Receive = {
def onCommand = {
case s: String =>
persist(s) {
case Success(e) => collector ! e
case Failure(e) => throw e
}
}

val onEvent: Receive = {
def onEvent = {
case "ignore" =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ import scala.util._

object EventsourcedProcessorIntegrationSpec {
class SampleActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
override val onCommand: Receive = {
override def onCommand = {
case s: String => persist(s) {
case Success(_) => onEvent(s)
case Success(_) =>
case Failure(_) =>
}
}

override val onEvent: Receive = {
override def onEvent = {
case s: String => probe ! ((s, lastVectorTimestamp))
}
}

class StatelessSampleProcessor(val id: String, val eventLog: ActorRef, val targetEventLog: ActorRef, eventProbe: ActorRef, progressProbe: ActorRef) extends EventsourcedProcessor {
override val onCommand: Receive = {
override def onCommand = {
case "boom" => throw boom
case "snap" => save("") {
case Success(_) => eventProbe ! "snapped"
Expand All @@ -55,7 +55,7 @@ object EventsourcedProcessorIntegrationSpec {
List(s"${s}-processed-1", s"${s}-processed-2")
}

override val onSnapshot: Receive = {
override def onSnapshot = {
case _ =>
}

Expand Down
4 changes: 2 additions & 2 deletions src/it/scala/com/rbmhtechnology/eventuate/RecoverySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ object RecoverySpec {
class ConvergenceView(val id: String, val eventLog: ActorRef, expectedSize: Int, probe: ActorRef) extends EventsourcedView {
var state: Set[String] = Set()

val onCommand: Receive = {
def onCommand = {
case _ =>
}

val onEvent: Receive = {
def onEvent = {
case s: String =>
state += s
if (state.size == expectedSize) probe ! state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.rbmhtechnology.eventuate.log.leveldb._
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.scalatest._

import scala.concurrent.Future

abstract class ReplicationCycleSpec extends WordSpec with Matchers with ReplicationNodeRegistry {
import ReplicationIntegrationSpec._

Expand Down Expand Up @@ -70,9 +72,11 @@ abstract class ReplicationCycleSpec extends WordSpec with Matchers with Replicat
val expectedC = 2 to num map { i => s"c$i"}
val all = expectedA ++ expectedB ++ expectedC

expectedA.foreach(s => actorA ! s)
expectedB.foreach(s => actorB ! s)
expectedC.foreach(s => actorC ! s)
import nodeA.system.dispatcher

Future(expectedA.foreach(s => actorA ! s))
Future(expectedB.foreach(s => actorB ! s))
Future(expectedC.foreach(s => actorC ! s))

probeA.expectMsgAllOf(all: _*)
probeB.expectMsgAllOf(all: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ object ReplicationIntegrationSpec {
class ReplicatedActor(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedActor {
override val stateSync = false

val onCommand: Receive = {
def onCommand = {
case s: String => persist(s) {
case Success(e) => onEvent(e)
case Success(e) =>
case Failure(e) => throw e
}
}

val onEvent: Receive = {
def onEvent = {
case s: String => probe ! s
}
}
Expand Down
Loading

1 comment on commit dfb35b5

@volkerstampa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Please sign in to comment.