diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala new file mode 100644 index 0000000..fbcd065 --- /dev/null +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala @@ -0,0 +1,61 @@ +package in.ashwanthkumar.suuchi.partitioner + +import java.util + +import in.ashwanthkumar.suuchi.cluster.MemberAddress + +case class Shard(id: Int, primaryPartition: VNode, followers: List[VNode]) { + def key = id + "_" + primaryPartition.node.host + "_" + primaryPartition.node.port + + def getNodes = primaryPartition.node :: followers.map(_.node) +} + +class CHRWithShards(hashFn: Hash, primaryPartitionsPerNode: Int, replicationFactor: Int = 2) { + private val sortedMap = new util.TreeMap[Integer, Shard]() + private val priority = new NodePriority + + def init(nodes: List[MemberAddress]) = { + priority.init(nodes) + nodes.foreach(node => add(node)) + this + } + + private def hash(shard: Shard): Int = hashFn.hash(shard.key.getBytes) + + def followers(node: MemberAddress): List[VNode] = { + priority.followers(replicationFactor - 1, node).map(address => VNode(address, 1, Follower)) + } + + private def add(node: MemberAddress) = { + (1 to primaryPartitionsPerNode).map(i => Shard(i, VNode(node, 1, Primary), followers(node))).foreach { shard => + sortedMap.put(hash(shard), shard) + } + this + } + + def findCandidate(hash: Integer): Shard = { + if (sortedMap.containsKey(hash)) return sortedMap.get(hash) + val tailMap = sortedMap.tailMap(hash) + val newHash = if (tailMap.isEmpty) sortedMap.firstKey() else tailMap.firstKey() + sortedMap.get(newHash) + } + + def find(key: Array[Byte]): List[MemberAddress] = { + if (sortedMap.isEmpty) return Nil + + val hashIdx = hashFn.hash(key) + + val candidates = findCandidate(hashIdx) + + candidates.getNodes + } + + // USED ONLY FOR TESTS + private[partitioner] def shards = sortedMap.values() + +} + +object CHRWithShards { + def apply(hashFn: Hash, nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = new CHRWithShards(hashFn, primaryPartitionsPerNode, replicationFactor).init(nodes) + def apply(nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = apply(SuuchiHash, nodes, primaryPartitionsPerNode, replicationFactor) +} diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala new file mode 100644 index 0000000..ee62597 --- /dev/null +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala @@ -0,0 +1,54 @@ +package in.ashwanthkumar.suuchi.partitioner + +import in.ashwanthkumar.suuchi.cluster.MemberAddress + +import scala.collection.mutable + +case class WeightedNode(node: MemberAddress, weight: Int) extends Ordered[WeightedNode] { + override def compare(that: WeightedNode): Int = this.weight.compareTo(that.weight) +} + +class NodePriority { + private val queue = mutable.PriorityQueue.empty[WeightedNode] + private val initialWeight = Integer.MAX_VALUE + + def init(nodes: List[MemberAddress]) = { + nodes.foreach(node => queue.enqueue(WeightedNode(node, initialWeight))) + } + + def add(node: MemberAddress) = { + queue.enqueue(WeightedNode(node, initialWeight)) + } + + def add(weightedNode: WeightedNode) = { + queue.enqueue(weightedNode) + } + + def dePrioritize(weightedNode: WeightedNode) = { + queue.enqueue(WeightedNode(weightedNode.node, weightedNode.weight - 1)) + } + + def allFollowers(number: Int, currentNode: MemberAddress, followers: List[MemberAddress], adjustQueue: List[WeightedNode]): List[MemberAddress] = { + if (number == 0) { + adjustQueue.foreach(add) + followers + } + else { + val weightedNode = queue.dequeue() + if (weightedNode.node.equals(currentNode) || followers.contains(weightedNode.node)) { + allFollowers(number, currentNode, followers, weightedNode :: adjustQueue) + } + else { + dePrioritize(weightedNode) + allFollowers(number - 1, currentNode, weightedNode.node :: followers, adjustQueue) + } + } + } + + def followers(numberOfFollowers: Int, node: MemberAddress) = allFollowers(numberOfFollowers, node, List(), List()) + + def values() = queue.toList + + def dequeue() = queue.dequeue() + +} diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala new file mode 100644 index 0000000..b0bb779 --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala @@ -0,0 +1,44 @@ +package in.ashwanthkumar.suuchi.partitioner + +import in.ashwanthkumar.suuchi.cluster.MemberAddress +import in.ashwanthkumar.suuchi.utils.DistinctMatcher +import org.scalatest.FlatSpec +import org.scalatest.Matchers._ + +import scala.collection.JavaConversions._ + +class CHRWithShardsTest extends FlatSpec { + + val distinct = new DistinctMatcher[MemberAddress] + + def printRing(ring: CHRWithShards)= ring.shards.toList.sortBy(_.id).foreach(s => println(s"Id: ${s.id} Key: ${s.key} Primary: ${s.primaryPartition.node.toExternalForm} Followers: ${s.followers.map(_.node.toExternalForm)}")) + + "CHRWithShards" should "pin nodes into the ring accounting for shards" in { + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3), MemberAddress("host4", 4), MemberAddress("host5", 5)) + val ring = CHRWithShards(nodes, primaryPartitionsPerNode = 2, replicationFactor = 3) + + ring.shards should have size 10 +// printRing(ring) + } + + it should "not a node as a follower of itself" in { + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + val ring = CHRWithShards(nodes, 1, 3) + + val shards = ring.shards.toList + shards.foreach{ shard => + shard.followers should not contain shard.primaryPartition.node + } + } + + it should "have shards with distinct number nodes to replicate data" in { + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + val ring = CHRWithShards(nodes, 1, 3) + + val shards = ring.shards.toList + + shards.foreach{shard => + shard.getNodes shouldBe distinct + } + } +} diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala new file mode 100644 index 0000000..de1b53d --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala @@ -0,0 +1,114 @@ +package in.ashwanthkumar.suuchi.partitioner + +import in.ashwanthkumar.suuchi.cluster.MemberAddress +import in.ashwanthkumar.suuchi.utils.DistinctMatcher +import org.scalatest.FlatSpec +import org.scalatest.Matchers._ + +import scala.collection.mutable + +class NodePriorityTest extends FlatSpec { + + val distinct = new DistinctMatcher[MemberAddress] + + "NodePriority" should "init the queue with nodes" in { + val priorityQueue = new NodePriority() + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + + priorityQueue.init(nodes) + + priorityQueue.values() should have size 3 + + priorityQueue.add(MemberAddress("host4", 4)) + + priorityQueue.values() should have size 4 + + priorityQueue.add(WeightedNode(MemberAddress("host5", 5), 10)) + + priorityQueue.values() should have size 5 + } + + it should "decrease a node's weight when it is de-prioritized" in { + val priorityQueue = new NodePriority() + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + + priorityQueue.init(nodes) + val weightedNode = priorityQueue.dequeue() + + priorityQueue.dePrioritize(weightedNode) + + priorityQueue.values().filter(_.node.equals(weightedNode.node)).head.weight should be < weightedNode.weight + } + + it should "give nodes in prioritized order when de-queued" in { + val priorityQueue = new NodePriority() + val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) + val leastWeighed = WeightedNode(MemberAddress("host4", 4), 4) + + priorityQueue.init(nodes) + val weightedNode = priorityQueue.dequeue() + priorityQueue.dePrioritize(weightedNode) + priorityQueue.add(leastWeighed) + + priorityQueue.dequeue().node should not be weightedNode.node + priorityQueue.dequeue().node should not be weightedNode.node + priorityQueue.dequeue().node should be (weightedNode.node) + priorityQueue.dequeue().node should be (leastWeighed.node) + } + + it should "return back the requested number of follower nodes for a given node based on priority to support even distribution of nodes" in { + val priorityQueue = new NodePriority() + val initialWeight = 10 + (1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) + val followersAssignedSoFar = mutable.Set[MemberAddress]() + + val node1Followers = priorityQueue.followers(2, MemberAddress("host1", 1)).toSet + node1Followers should have size 2 + followersAssignedSoFar.intersect(node1Followers) shouldBe empty + followersAssignedSoFar ++= node1Followers + + val node2Followers = priorityQueue.followers(2, MemberAddress("host2", 2)).toSet + node2Followers should have size 2 + followersAssignedSoFar.intersect(node2Followers) shouldBe empty + followersAssignedSoFar ++= node2Followers + + val node3Followers = priorityQueue.followers(2, MemberAddress("host3", 3)).toSet + node3Followers should have size 2 + followersAssignedSoFar.intersect(node3Followers) shouldBe empty + followersAssignedSoFar ++= node3Followers + + val node4Followers = priorityQueue.followers(2, MemberAddress("host4", 4)).toSet + node4Followers should have size 2 + followersAssignedSoFar.intersect(node4Followers) shouldBe empty + followersAssignedSoFar ++= node4Followers + + val node5Followers = priorityQueue.followers(2, MemberAddress("host5", 5)).toSet + node5Followers should have size 2 + followersAssignedSoFar.intersect(node5Followers) shouldBe empty + followersAssignedSoFar ++= node5Followers + + followersAssignedSoFar should have size 10 + } + + it should "not return a node as a follower of itself" in { + val priorityQueue = new NodePriority() + val initialWeight = 10 + (1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) + + priorityQueue.followers(9, MemberAddress("host1", 1)) shouldNot contain (MemberAddress("host1", 1)) + priorityQueue.followers(9, MemberAddress("host5", 5)) shouldNot contain (MemberAddress("host5", 5)) + priorityQueue.followers(9, MemberAddress("host10", 10)) shouldNot contain (MemberAddress("host10", 10)) + } + + it should "followers should be distinct, ie, there shouldn't be a duplicate entry in the result even though the next de-queued item has the highest priority" in { + val priorityQueue = new NodePriority() + val initialWeight = 10 + (1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) + priorityQueue.followers(3, MemberAddress("host1", 1)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host2", 2)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host3", 3)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host4", 4)) shouldBe distinct + priorityQueue.followers(3, MemberAddress("host5", 5)) shouldBe distinct + } + +} diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala new file mode 100644 index 0000000..abb9481 --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala @@ -0,0 +1,9 @@ +package in.ashwanthkumar.suuchi.utils + +import org.scalatest.matchers.{BeMatcher, MatchResult} + +class DistinctMatcher[T] extends BeMatcher[List[T]] { + override def apply(left: List[T]): MatchResult = { + MatchResult(left.distinct.size == left.size, "is not distinct", "is distinct") + } +}