From f5bedfaac62b71f5ca8e77639b739cd87a0c583d Mon Sep 17 00:00:00 2001 From: Selvaram Ganesh <gsriram7@gmail.com> Date: Tue, 29 Nov 2016 19:54:19 +0530 Subject: [PATCH 1/4] Introducing the notion of shards as an encapsulation of primary and secondary partitions --- .../suuchi/partitioner/CHRWithShards.scala | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala new file mode 100644 index 0000000..b6f415e --- /dev/null +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala @@ -0,0 +1,53 @@ +package in.ashwanthkumar.suuchi.partitioner + +import java.util + +import in.ashwanthkumar.suuchi.cluster.MemberAddress + +case class Shard(primaryPartition: VNode, follower: List[VNode]) { + def key = primaryPartition.node.host + "_" + primaryPartition.node.port + "_" + primaryPartition.nodeReplicaId + def getNodes = primaryPartition.node :: follower.map(_.node) +} + +class CHRWithShards(hashFn: Hash, partitionsPerNode: Int, replicationFactor: Int = 2) { + val sortedMap = new util.TreeMap[Integer, Shard]() + + val MAX_DUPES = 10 + + def init(nodes: List[MemberAddress]) = { + nodes.foreach(node => add(node, nodes)) + this + } + + private def hash(shard: Shard): Int = hashFn.hash(shard.key.getBytes) + + def followers(nodes: List[MemberAddress]): List[VNode] = { + nodes.flatMap(node => (1 until replicationFactor).map(i => VNode(node, i, Follower))) + } + + def add(node: MemberAddress, nodes: List[MemberAddress]) = { + (1 to partitionsPerNode).map(i => Shard(VNode(node, 1, Primary), followers(nodes.filterNot(_.equals(node))))).foreach{shard => + sortedMap.put(hash(shard), shard) + } + this + } + + def findCandidate(hash: Integer) = { + if (sortedMap.containsKey(hash)) hash -> sortedMap.get(hash) + else { + val tailMap = sortedMap.tailMap(hash) + val newHash = if (tailMap.isEmpty) sortedMap.firstKey() else tailMap.firstKey() + newHash -> sortedMap.get(newHash) + } + } + + def find(key: Array[Byte]): Option[Shard] = { + if (sortedMap.isEmpty) return None + + val hashIdx = hashFn.hash(key) + val (newHash, candidate) = findCandidate(hashIdx) + + candidate.getNodes + } + +} From 932513a19462ebb20c4b7ef6913ddb8fd83aef94 Mon Sep 17 00:00:00 2001 From: Selvaram Ganesh <gsriram7@gmail.com> Date: Tue, 29 Nov 2016 19:56:29 +0530 Subject: [PATCH 2/4] Fixing a compilation error in CHRWithShards --- .../in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala index b6f415e..e7dd120 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala @@ -41,8 +41,8 @@ class CHRWithShards(hashFn: Hash, partitionsPerNode: Int, replicationFactor: Int } } - def find(key: Array[Byte]): Option[Shard] = { - if (sortedMap.isEmpty) return None + def find(key: Array[Byte]): List[MemberAddress] = { + if (sortedMap.isEmpty) return Nil val hashIdx = hashFn.hash(key) val (newHash, candidate) = findCandidate(hashIdx) From 8bdc42f17704abc8f148b8f0998b233abeb97ba5 Mon Sep 17 00:00:00 2001 From: Selvaram Ganesh <gsriram7@gmail.com> Date: Tue, 29 Nov 2016 23:52:17 +0530 Subject: [PATCH 3/4] Removing unused variables Added a FIXME wrt follower allocation --- .../suuchi/partitioner/CHRWithShards.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala index e7dd120..9f34566 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala @@ -6,14 +6,13 @@ import in.ashwanthkumar.suuchi.cluster.MemberAddress case class Shard(primaryPartition: VNode, follower: List[VNode]) { def key = primaryPartition.node.host + "_" + primaryPartition.node.port + "_" + primaryPartition.nodeReplicaId + def getNodes = primaryPartition.node :: follower.map(_.node) } class CHRWithShards(hashFn: Hash, partitionsPerNode: Int, replicationFactor: Int = 2) { val sortedMap = new util.TreeMap[Integer, Shard]() - val MAX_DUPES = 10 - def init(nodes: List[MemberAddress]) = { nodes.foreach(node => add(node, nodes)) this @@ -26,28 +25,28 @@ class CHRWithShards(hashFn: Hash, partitionsPerNode: Int, replicationFactor: Int } def add(node: MemberAddress, nodes: List[MemberAddress]) = { - (1 to partitionsPerNode).map(i => Shard(VNode(node, 1, Primary), followers(nodes.filterNot(_.equals(node))))).foreach{shard => + // FIXME: followers based on nodes.filterNot will lead to a skew in first elements in the list + (1 to partitionsPerNode).map(i => Shard(VNode(node, 1, Primary), followers(nodes.filterNot(_.equals(node))))).foreach { shard => sortedMap.put(hash(shard), shard) } this } - def findCandidate(hash: Integer) = { - if (sortedMap.containsKey(hash)) hash -> sortedMap.get(hash) - else { - val tailMap = sortedMap.tailMap(hash) - val newHash = if (tailMap.isEmpty) sortedMap.firstKey() else tailMap.firstKey() - newHash -> sortedMap.get(newHash) - } + def findCandidate(hash: Integer): Shard = { + if (sortedMap.containsKey(hash)) return sortedMap.get(hash) + val tailMap = sortedMap.tailMap(hash) + val newHash = if (tailMap.isEmpty) sortedMap.firstKey() else tailMap.firstKey() + sortedMap.get(newHash) } def find(key: Array[Byte]): List[MemberAddress] = { if (sortedMap.isEmpty) return Nil val hashIdx = hashFn.hash(key) - val (newHash, candidate) = findCandidate(hashIdx) + + val candidates = findCandidate(hashIdx) - candidate.getNodes + candidates.getNodes } } From 282dc1dbc2e58727eb8059caf595f1d1457c6c4b Mon Sep 17 00:00:00 2001 From: Selvaram Ganesh <gsriram7@gmail.com> Date: Thu, 1 Dec 2016 19:18:40 +0530 Subject: [PATCH 4/4] Using a priority queue approach to assign followers to shards Added tests for NodePriority P.S More tests need to be added to CHRWithShards and method and class names of NodePriority has to be refactored --- .../suuchi/partitioner/CHRWithShards.scala | 31 +++-- .../suuchi/partitioner/NodePriority.scala | 54 +++++++++ .../partitioner/CHRWithShardsTest.scala | 44 +++++++ .../suuchi/partitioner/NodePriorityTest.scala | 114 ++++++++++++++++++ .../suuchi/utils/DistinctMatcher.scala | 9 ++ 5 files changed, 241 insertions(+), 11 deletions(-) create mode 100644 suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala create mode 100644 suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala create mode 100644 suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala create mode 100644 suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala index 9f34566..fbcd065 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala @@ -4,29 +4,30 @@ import java.util import in.ashwanthkumar.suuchi.cluster.MemberAddress -case class Shard(primaryPartition: VNode, follower: List[VNode]) { - def key = primaryPartition.node.host + "_" + primaryPartition.node.port + "_" + primaryPartition.nodeReplicaId +case class Shard(id: Int, primaryPartition: VNode, followers: List[VNode]) { + def key = id + "_" + primaryPartition.node.host + "_" + primaryPartition.node.port - def getNodes = primaryPartition.node :: follower.map(_.node) + def getNodes = primaryPartition.node :: followers.map(_.node) } -class CHRWithShards(hashFn: Hash, partitionsPerNode: Int, replicationFactor: Int = 2) { - val sortedMap = new util.TreeMap[Integer, Shard]() +class CHRWithShards(hashFn: Hash, primaryPartitionsPerNode: Int, replicationFactor: Int = 2) { + private val sortedMap = new util.TreeMap[Integer, Shard]() + private val priority = new NodePriority def init(nodes: List[MemberAddress]) = { - nodes.foreach(node => add(node, nodes)) + priority.init(nodes) + nodes.foreach(node => add(node)) this } private def hash(shard: Shard): Int = hashFn.hash(shard.key.getBytes) - def followers(nodes: List[MemberAddress]): List[VNode] = { - nodes.flatMap(node => (1 until replicationFactor).map(i => VNode(node, i, Follower))) + def followers(node: MemberAddress): List[VNode] = { + priority.followers(replicationFactor - 1, node).map(address => VNode(address, 1, Follower)) } - def add(node: MemberAddress, nodes: List[MemberAddress]) = { - // FIXME: followers based on nodes.filterNot will lead to a skew in first elements in the list - (1 to partitionsPerNode).map(i => Shard(VNode(node, 1, Primary), followers(nodes.filterNot(_.equals(node))))).foreach { shard => + private def add(node: MemberAddress) = { + (1 to primaryPartitionsPerNode).map(i => Shard(i, VNode(node, 1, Primary), followers(node))).foreach { shard => sortedMap.put(hash(shard), shard) } this @@ -49,4 +50,12 @@ class CHRWithShards(hashFn: Hash, partitionsPerNode: Int, replicationFactor: Int candidates.getNodes } + // USED ONLY FOR TESTS + private[partitioner] def shards = sortedMap.values() + +} + +object CHRWithShards { + def apply(hashFn: Hash, nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = new CHRWithShards(hashFn, primaryPartitionsPerNode, replicationFactor).init(nodes) + def apply(nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = apply(SuuchiHash, nodes, primaryPartitionsPerNode, replicationFactor) } diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala new file mode 100644 index 0000000..ee62597 --- /dev/null +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala @@ -0,0 +1,54 @@ +package in.ashwanthkumar.suuchi.partitioner + +import in.ashwanthkumar.suuchi.cluster.MemberAddress + +import scala.collection.mutable + +case class WeightedNode(node: MemberAddress, weight: Int) extends Ordered[WeightedNode] { + override def compare(that: WeightedNode): Int = this.weight.compareTo(that.weight) +} + +class NodePriority { + private val queue = mutable.PriorityQueue.empty[WeightedNode] + private val initialWeight = Integer.MAX_VALUE + + def init(nodes: List[MemberAddress]) = { + nodes.foreach(node => queue.enqueue(WeightedNode(node, initialWeight))) + } + + def add(node: MemberAddress) = { + queue.enqueue(WeightedNode(node, initialWeight)) + } + + def add(weightedNode: WeightedNode) = { + queue.enqueue(weightedNode) + } + + def dePrioritize(weightedNode: WeightedNode) = { + queue.enqueue(WeightedNode(weightedNode.node, weightedNode.weight - 1)) + } + + def allFollowers(number: Int, currentNode: MemberAddress, followers: List[MemberAddress], adjustQueue: List[WeightedNode]): List[MemberAddress] = { + if (number == 0) { + adjustQueue.foreach(add) + followers + } + else { + val weightedNode = queue.dequeue() + if (weightedNode.node.equals(currentNode) || followers.contains(weightedNode.node)) { + allFollowers(number, currentNode, followers, weightedNode :: adjustQueue) + } + else { + dePrioritize(weightedNode) + allFollowers(number - 1, currentNode, weightedNode.node :: followers, adjustQueue) + } + } + } + + def followers(numberOfFollowers: Int, node: MemberAddress) = allFollowers(numberOfFollowers, node, List(), List()) + + def values() = queue.toList + + def dequeue() = queue.dequeue() + +} diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala new file mode 100644 index 0000000..b0bb779 --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala @@ -0,0 +1,44 @@ +package in.ashwanthkumar.suuchi.partitioner + +import in.ashwanthkumar.suuchi.cluster.MemberAddress +import in.ashwanthkumar.suuchi.utils.DistinctMatcher +import org.scalatest.FlatSpec +import org.scalatest.Matchers._ + +import scala.collection.JavaConversions._ + +class CHRWithShardsTest extends FlatSpec { + + val distinct = new DistinctMatcher[MemberAddress] + + def printRing(ring: CHRWithShards)= ring.shards.toList.sortBy(_.id).foreach(s => println(s"Id: ${s.id} Key: ${s.key} Primary: ${s.primaryPartition.node.toExternalForm} Followers: ${s.followers.map(_.node.toExternalForm)}")) + + "CHRWithShards" should "pin nodes into the ring accounting for shards" in { + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3), MemberAddress("host4", 4), MemberAddress("host5", 5)) + val ring = CHRWithShards(nodes, primaryPartitionsPerNode = 2, replicationFactor = 3) + + ring.shards should have size 10 +// printRing(ring) + } + + it should "not a node as a follower of itself" in { + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + val ring = CHRWithShards(nodes, 1, 3) + + val shards = ring.shards.toList + shards.foreach{ shard => + shard.followers should not contain shard.primaryPartition.node + } + } + + it should "have shards with distinct number nodes to replicate data" in { + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + val ring = CHRWithShards(nodes, 1, 3) + + val shards = ring.shards.toList + + shards.foreach{shard => + shard.getNodes shouldBe distinct + } + } +} diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala new file mode 100644 index 0000000..de1b53d --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala @@ -0,0 +1,114 @@ +package in.ashwanthkumar.suuchi.partitioner + +import in.ashwanthkumar.suuchi.cluster.MemberAddress +import in.ashwanthkumar.suuchi.utils.DistinctMatcher +import org.scalatest.FlatSpec +import org.scalatest.Matchers._ + +import scala.collection.mutable + +class NodePriorityTest extends FlatSpec { + + val distinct = new DistinctMatcher[MemberAddress] + + "NodePriority" should "init the queue with nodes" in { + val priorityQueue = new NodePriority() + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + + priorityQueue.init(nodes) + + priorityQueue.values() should have size 3 + + priorityQueue.add(MemberAddress("host4", 4)) + + priorityQueue.values() should have size 4 + + priorityQueue.add(WeightedNode(MemberAddress("host5", 5), 10)) + + priorityQueue.values() should have size 5 + } + + it should "decrease a node's weight when it is de-prioritized" in { + val priorityQueue = new NodePriority() + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + + priorityQueue.init(nodes) + val weightedNode = priorityQueue.dequeue() + + priorityQueue.dePrioritize(weightedNode) + + priorityQueue.values().filter(_.node.equals(weightedNode.node)).head.weight should be < weightedNode.weight + } + + it should "give nodes in prioritized order when de-queued" in { + val priorityQueue = new NodePriority() + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + val leastWeighed = WeightedNode(MemberAddress("host4", 4), 4) + + priorityQueue.init(nodes) + val weightedNode = priorityQueue.dequeue() + priorityQueue.dePrioritize(weightedNode) + priorityQueue.add(leastWeighed) + + priorityQueue.dequeue().node should not be weightedNode.node + priorityQueue.dequeue().node should not be weightedNode.node + priorityQueue.dequeue().node should be (weightedNode.node) + priorityQueue.dequeue().node should be (leastWeighed.node) + } + + it should "return back the requested number of follower nodes for a given node based on priority to support even distribution of nodes" in { + val priorityQueue = new NodePriority() + val initialWeight = 10 + (1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) + val followersAssignedSoFar = mutable.Set[MemberAddress]() + + val node1Followers = priorityQueue.followers(2, MemberAddress("host1", 1)).toSet + node1Followers should have size 2 + followersAssignedSoFar.intersect(node1Followers) shouldBe empty + followersAssignedSoFar ++= node1Followers + + val node2Followers = priorityQueue.followers(2, MemberAddress("host2", 2)).toSet + node2Followers should have size 2 + followersAssignedSoFar.intersect(node2Followers) shouldBe empty + followersAssignedSoFar ++= node2Followers + + val node3Followers = priorityQueue.followers(2, MemberAddress("host3", 3)).toSet + node3Followers should have size 2 + followersAssignedSoFar.intersect(node3Followers) shouldBe empty + followersAssignedSoFar ++= node3Followers + + val node4Followers = priorityQueue.followers(2, MemberAddress("host4", 4)).toSet + node4Followers should have size 2 + followersAssignedSoFar.intersect(node4Followers) shouldBe empty + followersAssignedSoFar ++= node4Followers + + val node5Followers = priorityQueue.followers(2, MemberAddress("host5", 5)).toSet + node5Followers should have size 2 + followersAssignedSoFar.intersect(node5Followers) shouldBe empty + followersAssignedSoFar ++= node5Followers + + followersAssignedSoFar should have size 10 + } + + it should "not return a node as a follower of itself" in { + val priorityQueue = new NodePriority() + val initialWeight = 10 + (1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) + + priorityQueue.followers(9, MemberAddress("host1", 1)) shouldNot contain (MemberAddress("host1", 1)) + priorityQueue.followers(9, MemberAddress("host5", 5)) shouldNot contain (MemberAddress("host5", 5)) + priorityQueue.followers(9, MemberAddress("host10", 10)) shouldNot contain (MemberAddress("host10", 10)) + } + + it should "followers should be distinct, ie, there shouldn't be a duplicate entry in the result even though the next de-queued item has the highest priority" in { + val priorityQueue = new NodePriority() + val initialWeight = 10 + (1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) + priorityQueue.followers(3, MemberAddress("host1", 1)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host2", 2)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host3", 3)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host4", 4)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host5", 5)) shouldBe distinct + } + +} diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala new file mode 100644 index 0000000..abb9481 --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala @@ -0,0 +1,9 @@ +package in.ashwanthkumar.suuchi.utils + +import org.scalatest.matchers.{BeMatcher, MatchResult} + +class DistinctMatcher[T] extends BeMatcher[List[T]] { + override def apply(left: List[T]): MatchResult = { + MatchResult(left.distinct.size == left.size, "is not distinct", "is distinct") + } +}