Skip to content

Commit

Permalink
Considering replication factor to tag partitions with partition type
Browse files Browse the repository at this point in the history
  • Loading branch information
gsriram7 committed Nov 28, 2016
1 parent 5fcfc91 commit 733e64b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import scala.collection.mutable

sealed trait PartitionType
object PartitionType {
def apply(index: Int) = {
if (index == 1) Primary
def apply(index: Int, replicationFactor: Int) = {
if (index % replicationFactor == 1) Primary
else Follower
}
}
Expand Down Expand Up @@ -44,14 +44,14 @@ class ConsistentHashRing(hashFn: Hash, partitionsPerNode: Int, replicationFactor
private def hash(vnode: VNode): Int = hashFn.hash(vnode.key.getBytes)

def add(node: MemberAddress) = {
(1 to partitionsPerNode).map(i => VNode(node, i, PartitionType(i))).foreach { vnode =>
(1 to partitionsPerNode).map(i => VNode(node, i, PartitionType(i, replicationFactor))).foreach { vnode =>
sortedMap.put(hash(vnode), vnode)
}
this
}

def remove(node: MemberAddress) = {
(1 to partitionsPerNode).map(i => VNode(node, i, PartitionType(i))).foreach { vnode =>
(1 to partitionsPerNode).map(i => VNode(node, i, PartitionType(i, replicationFactor))).foreach { vnode =>
sortedMap.remove(hash(vnode))
}
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,17 @@ class ConsistentHashRingSpec extends FlatSpec {

val vNodes = ring.sortedMap.values().toList

vNodes.filter(_.pType.equals(Primary)) should have size 3
vNodes.filter(_.pType.equals(Follower)) should have size 6
vNodes.filter(_.pType.equals(Primary)) should have size 6
vNodes.filter(_.pType.equals(Follower)) should have size 3
}

it should "set vNode with replica id 1 as Primary partition, irrespective of replication factor" in {
val ring = new ConsistentHashRing(SuuchiHash, 2)
ring.init(List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)))
it should "set a primary partition even if #partitions is less than rpf" in {
val ring = new ConsistentHashRing(SuuchiHash, 2, replicationFactor = 2 + 1)
val hosts = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3))
ring.init(hosts)

val vNodes = ring.sortedMap.values().toList

vNodes.filter(_.pType.equals(Primary)).map(_.nodeReplicaId).reduce(_ & _) should be(1)
vNodes.filter(_.pType.equals(Primary)) should have size 3
}
}

0 comments on commit 733e64b

Please sign in to comment.