Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Update Cassandra index on AdjustEventLogClock
Browse files Browse the repository at this point in the history
When writing the adjusted event log clock after disaster recovery also
update the index up to the last sequence nr to ensure that the written
event log clock snapshot is consistent with the index.

Before the index-actor informed the log-actor with a message about
an up-to-date index. This has been replaces by a promise so that
its future can be returned. The alternative of using and ask would
have required to specify an timeout, however it is hard to define
a reasonable default for this timeout.

Closes #393
  • Loading branch information
volkerstampa committed Jun 30, 2017
1 parent 18192fa commit 9360c1c
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ class Location(val id: String, logFactory: String => Props, customPort: Int, cus
val probe: TestProbe =
new TestProbe(system)

def listener(eventLog: ActorRef): EventListener =
new EventListener(id, eventLog)(system)
def listener(eventLog: ActorRef, aggregateId: Option[String] = None): EventListener =
new EventListener(id, eventLog, aggregateId)(system)

def endpoint(
logNames: Set[String],
Expand All @@ -197,11 +197,14 @@ class Location(val id: String, logFactory: String => Props, customPort: Int, cus
}

object Location {
class EventListener(locationId: String, eventLog: ActorRef)(implicit system: ActorSystem) extends TestProbe(system, s"EventListener-$locationId") { listener =>
class EventListener(locationId: String, eventLog: ActorRef, aggregateId: Option[String])(implicit system: ActorSystem) extends TestProbe(system, s"EventListener-$locationId") { listener =>
private class EventListenerView extends EventsourcedView {
override val id =
testActorName

override def aggregateId =
listener.aggregateId

override val eventLog =
listener.eventLog

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ package object utilities {
def await: T = Await.result(awaitable, timeoutDuration)
}

def write(target: ReplicationTarget, events: Seq[String]): Unit = {
def write(target: ReplicationTarget, events: Seq[String], aggregateId: Option[String] = None): Unit = {
val system = target.endpoint.system
val probe = TestProbe()(system)
target.log ! Write(events.map(DurableEvent(_, target.logId)), system.deadLetters, probe.ref, 0, 0)
target.log ! Write(events.map(DurableEvent(_, target.logId, emitterAggregateId = aggregateId)), system.deadLetters, probe.ref, 0, 0)
probe.expectMsgClass(classOf[WriteSuccess])
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2015 - 2016 Red Bull Media House GmbH <http://www.redbullmediahouse.com> - all rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rbmhtechnology.eventuate

import com.rbmhtechnology.eventuate.ReplicationIntegrationSpec.replicationConnection
import com.rbmhtechnology.eventuate.utilities._
import org.scalatest.Matchers
import org.scalatest.WordSpec

class RecoverySpecCassandra extends WordSpec with Matchers with MultiLocationSpecCassandra {
"ReplicationEndpoint recovery" must {
"leave the index in an consistent state" in { // test for issue #393
def newLocationA = location("A")
val locationA1 = newLocationA
val locationB = location("B")

def newEndpointA(l: Location) = l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = false)
val endpointA1 = newEndpointA(locationA1)
locationB.endpoint(Set("L1"), Set(replicationConnection(locationA1.port)))

val logA = endpointA1.target("L1")
write(logA, List("1"), Some("A1"))

endpointA1.recover().await
locationA1.terminate().await

val locationA2 = newLocationA
val endpointA2 = newEndpointA(locationA2)
locationA2.listener(endpointA2.logs("L1"), Some("A1")).waitForMessage("1")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ class EventLogSpecCassandra extends TestKit(ActorSystem("test", EventLogSpecCass
expectReplay(Some("a2"), "b", "d")
expectReplay(Some("a3"), "f")
}
"updates the index on AdjustEventLogClock request" in {
writeEmittedEvents(List(event("a"), event("b")))
(log ? AdjustEventLogClock).await

indexProbe.expectMsg(UpdateIndexSuccess(EventLogClock(sequenceNr = 2L, versionVector = timestamp(2L)), 1))
}
"replay aggregate events from log" in {
writeEmittedEvents(List(
event("a", Some("a1")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.rbmhtechnology.eventuate.log.cassandra
import java.io.Closeable

import akka.actor._
import akka.pattern.pipe

import com.datastax.driver.core.QueryOptions
import com.datastax.driver.core.exceptions._
Expand Down Expand Up @@ -136,7 +137,7 @@ class CassandraEventLog(id: String, aggregateIndexing: Boolean) extends EventLog

override def writeEventLogClockSnapshot(clock: EventLogClock): Future[Unit] = {
import context.dispatcher
indexStore.writeEventLogClockSnapshotAsync(clock).map(_ => ())
updateIndex(clock).flatMap(_ => indexStore.writeEventLogClockSnapshotAsync(clock)).map(_ => ())
}

override def write(events: Seq[DurableEvent], partition: Long, clock: EventLogClock): Unit =
Expand Down Expand Up @@ -183,25 +184,28 @@ class CassandraEventLog(id: String, aggregateIndexing: Boolean) extends EventLog
eventLogStore.write(events, partition)
updateCount += events.size
if (updateCount >= cassandra.settings.indexUpdateLimit) {
if (aggregateIndexing) {
// asynchronously update the index
index ! UpdateIndex(null, clock.sequenceNr)
} else {
// otherwise update the event log clock snapshot only
import context.dispatcher
indexStore.writeEventLogClockSnapshotAsync(clock)
}

updateIndex(clock)
updateCount = 0L
}
}

private def updateIndex(clock: EventLogClock): Future[EventLogClock] = {
import context.dispatcher
if (aggregateIndexing) {
// asynchronously update the index
val promise = Promise[UpdateIndexSuccess]
index ! UpdateIndex(null, clock.sequenceNr, promise)
promise.future.pipeTo(self)
promise.future.map(_.clock)
} else
// otherwise update the event log clock snapshot only
indexStore.writeEventLogClockSnapshotAsync(clock)
}

override def unhandled(message: Any): Unit = message match {
case u @ UpdateIndexSuccess(clock, _) =>
indexSequenceNr = clock.sequenceNr
onIndexEvent(u)
case u @ UpdateIndexFailure(_) =>
onIndexEvent(u)
case other =>
super.unhandled(other)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.rbmhtechnology.eventuate.log.cassandra

import akka.actor._
import akka.pattern.pipe

import com.rbmhtechnology.eventuate._
import com.rbmhtechnology.eventuate.log.EventLogClock
Expand All @@ -25,11 +26,9 @@ import scala.collection.immutable.Seq
import scala.concurrent._
import scala.util._

private[eventuate] class CassandraIndex(cassandra: Cassandra, eventLogClock: EventLogClock, eventLogStore: CassandraEventLogStore, indexStore: CassandraIndexStore, logId: String) extends Actor with Stash with ActorLogging {
private[eventuate] class CassandraIndex(cassandra: Cassandra, eventLogClock: EventLogClock, eventLogStore: CassandraEventLogStore, indexStore: CassandraIndexStore, logId: String) extends Actor {
import CassandraIndex._

private val scheduler = context.system.scheduler
private val eventLog = context.parent
import context.dispatcher

private val indexUpdater = context.actorOf(Props(new CassandraIndexUpdater(cassandra, eventLogStore, indexStore)))

Expand All @@ -39,20 +38,17 @@ private[eventuate] class CassandraIndex(cassandra: Cassandra, eventLogClock: Eve
*/
private var clock: EventLogClock = eventLogClock

def receive = {
case UpdateIndex(_, toSequenceNr) =>
indexUpdater ! UpdateIndex(clock, toSequenceNr)
case u @ UpdateIndexSuccess(t, _) =>
override def receive = {
case UpdateIndex(_, toSequenceNr, promise) =>
indexUpdater ! UpdateIndex(clock, toSequenceNr, promise)
promise.future.pipeTo(self)
case UpdateIndexSuccess(t, _) =>
clock = t
eventLog ! u
case u @ UpdateIndexFailure(cause) =>
log.error(cause, "UpdateIndex failure")
eventLog ! u
}
}

private[eventuate] object CassandraIndex {
case class UpdateIndex(clock: EventLogClock, toSequenceNr: Long)
case class UpdateIndex(clock: EventLogClock, toSequenceNr: Long, promise: Promise[UpdateIndexSuccess])
case class UpdateIndexProgress(increment: IndexIncrement)
case class UpdateIndexSuccess(clock: EventLogClock, steps: Int = 0)
case class UpdateIndexFailure(cause: Throwable)
Expand Down Expand Up @@ -83,28 +79,27 @@ private[eventuate] object CassandraIndex {
Props(new CassandraIndex(cassandra, eventLogClock, eventLogStore, indexStore, logId))
}

private class CassandraIndexUpdater(cassandra: Cassandra, eventLogStore: CassandraEventLogStore, indexStore: CassandraIndexStore) extends Actor {
private class CassandraIndexUpdater(cassandra: Cassandra, eventLogStore: CassandraEventLogStore, indexStore: CassandraIndexStore) extends Actor with ActorLogging {
import CassandraIndex._
import context.dispatcher

val index = context.parent

val idle: Receive = {
case UpdateIndex(clock, toSequenceNr) =>
case UpdateIndex(clock, toSequenceNr, promise) =>
update(clock.sequenceNr + 1L, toSequenceNr, IndexIncrement(AggregateEvents(), clock))
context.become(updating(0, toSequenceNr))
context.become(updating(0, toSequenceNr, promise))
}

def updating(steps: Int, toSequenceNr: Long): Receive = {
def updating(steps: Int, toSequenceNr: Long, promise: Promise[UpdateIndexSuccess]): Receive = {
case UpdateIndexFailure(err) =>
index ! UpdateIndexFailure(err)
promise.failure(err)
log.error(err, "UpdateIndex failure")
context.become(idle)
case UpdateIndexSuccess(t, _) =>
index ! UpdateIndexSuccess(t, steps)
promise.success(UpdateIndexSuccess(t, steps))
context.become(idle)
case UpdateIndexProgress(inc) =>
update(inc.clock.sequenceNr + 1L, toSequenceNr, inc.clearAggregateEvents)
context.become(updating(steps + 1, toSequenceNr))
context.become(updating(steps + 1, toSequenceNr, promise))
}

def receive = idle
Expand Down

0 comments on commit 9360c1c

Please sign in to comment.