Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #261 from RBMHTechnology/wip-260-disaster-recovery…
Browse files Browse the repository at this point in the history
…-special-exception

Failed disaster recovery should indicate whether partial updates have been made
  • Loading branch information
krasserm committed May 13, 2016
2 parents 53f377d + f694ae3 commit 8d89967
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ import scala.collection.immutable.Seq
import scala.concurrent._
import scala.concurrent.duration._

/**
* [[ReplicationEndpoint.recover]] completes with this exception if recovery fails.
*
* @param cause Recovery failure cause.
* @param partialUpdate Set to `true` if recovery already made partial updates, `false` if recovery
* failed without having made partial updates to replication partners.
*/
class RecoveryException(cause: Throwable, val partialUpdate: Boolean) extends RuntimeException(cause)

private class RecoverySettings(config: Config) {
val remoteOperationRetryMax: Int =
config.getInt("eventuate.log.recovery.remote-operation-retry-max")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,13 @@ class ReplicationEndpoint(
* have been lost.
*
* This procedure requires that event replication between this and directly connected endpoints is bi-directional
* and that these endpoints are available during recovery. Recovery is idempotent and must be repeated in failure
* cases by the application. After successful recovery the endpoint is automatically activated.
* Activating this endpoint without having successfully recovered from partial or total event
* loss may result in inconsistent replica states.
* and that these endpoints are available during recovery. After successful recovery the endpoint is automatically
* activated. A failed recovery completes with a [[RecoveryException]] and must be retried. Activating this endpoint
* without having successfully recovered from partial or total event loss may result in inconsistent replica states.
*
* Running a recovery on an endpoint that didn't loose events has no effect but may still fail due to unavailable
* replication partners, for example. In this case, a recovery retry can be omitted if the `partialUpdate` field
* of [[RecoveryException]] is set to `false`.
*/
def recover(): Future[Unit] = {
if (connections.isEmpty)
Expand All @@ -281,11 +284,15 @@ class ReplicationEndpoint(
val promise = Promise[Unit]()
val recovery = new Recovery(this)

def recoveryFailure[U](partialUpdate: Boolean): PartialFunction[Throwable, Future[U]] = {
case t => Future.failed(new RecoveryException(t, partialUpdate))
}

val recoveryOperation = for {
infos <- recovery.readEndpointInfos
clocks <- recovery.readEventLogClocks
infos <- recovery.readEndpointInfos.recoverWith(recoveryFailure(partialUpdate = false))
clocks <- recovery.readEventLogClocks.recoverWith(recoveryFailure(partialUpdate = false))
links = recovery.recoveryLinks(infos, clocks)
_ <- recovery.deleteSnapshots(links)
_ <- recovery.deleteSnapshots(links).recoverWith(recoveryFailure(partialUpdate = true))
} yield acceptor ! Recover(links, promise)

recoveryOperation.onFailure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.rbmhtechnology.eventuate.crdt.japi

import java.util.concurrent.CompletionStage
import java.util.{Optional => JOption}
import java.util.{ Optional => JOption }

import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ ActorRef, ActorSystem }
import com.rbmhtechnology.eventuate.crdt.LWWRegister

import scala.compat.java8.OptionConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ object DeleteEventsSpecLeveldb {
"""
|eventuate.log.replication.retry-delay = 1s
|eventuate.log.replication.remote-read-timeout = 2s
|eventuate.disaster-recovery.remote-operation-retry-max = 10
|eventuate.disaster-recovery.remote-operation-retry-delay = 1s
|eventuate.disaster-recovery.remote-operation-timeout = 1s
|eventuate.log.recovery.remote-operation-retry-max = 10
|eventuate.log.recovery.remote-operation-retry-delay = 1s
|eventuate.log.recovery.remote-operation-timeout = 1s
""".stripMargin)

val L1 = "L1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ object RecoverySpecLeveldb {
"""
|eventuate.log.replication.retry-delay = 1s
|eventuate.log.replication.remote-read-timeout = 2s
|eventuate.disaster-recovery.remote-operation-retry-max = 10
|eventuate.disaster-recovery.remote-operation-retry-delay = 1s
|eventuate.disaster-recovery.remote-operation-timeout = 1s
|eventuate.log.recovery.remote-operation-retry-max = 10
|eventuate.log.recovery.remote-operation-retry-delay = 1s
|eventuate.log.recovery.remote-operation-timeout = 1s
""".stripMargin)

def rootDirectory(target: ReplicationTarget): File =
Expand Down Expand Up @@ -97,10 +97,14 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpecL
an [IllegalStateException] shouldBe thrownBy(endpointA.activate())
}
"fail when connected endpoint is unavailable" in {
val locationA = location("A", customConfig = ConfigFactory.parseString("eventuate.disaster-recovery.remote-operation-retry-max = 0").withFallback(RecoverySpecLeveldb.config))
val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(customPort)))
val locationA = location("A", customConfig = ConfigFactory.parseString("eventuate.log.recovery.remote-operation-retry-max = 0").withFallback(RecoverySpecLeveldb.config))
val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(customPort)), activate = false)

an [Exception] shouldBe thrownBy(endpointA.recover().await)
val recoveryException = intercept[RecoveryException] {
endpointA.recover().await
}

recoveryException.partialUpdate should be(false)
}
"succeed normally if the endpoint was healthy (but not convergent yet)" in {
val locationB = location("B", customConfig = RecoverySpecLeveldb.config)
Expand Down

0 comments on commit 8d89967

Please sign in to comment.