Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing the notion of shards as an encapsulation of primary and s… #59

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package in.ashwanthkumar.suuchi.partitioner

import java.util

import in.ashwanthkumar.suuchi.cluster.MemberAddress

case class Shard(id: Int, primaryPartition: VNode, followers: List[VNode]) {
def key = id + "_" + primaryPartition.node.host + "_" + primaryPartition.node.port

def getNodes = primaryPartition.node :: followers.map(_.node)
}

class CHRWithShards(hashFn: Hash, primaryPartitionsPerNode: Int, replicationFactor: Int = 2) {
private val sortedMap = new util.TreeMap[Integer, Shard]()
private val priority = new NodePriority

def init(nodes: List[MemberAddress]) = {
priority.init(nodes)
nodes.foreach(node => add(node))
this
}

private def hash(shard: Shard): Int = hashFn.hash(shard.key.getBytes)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move all private methods to the bottom of the class.


def followers(node: MemberAddress): List[VNode] = {
priority.followers(replicationFactor - 1, node).map(address => VNode(address, 1, Follower))
}

private def add(node: MemberAddress) = {
(1 to primaryPartitionsPerNode).map(i => Shard(i, VNode(node, 1, Primary), followers(node))).foreach { shard =>
sortedMap.put(hash(shard), shard)
}
this
}

def findCandidate(hash: Integer): Shard = {
if (sortedMap.containsKey(hash)) return sortedMap.get(hash)
val tailMap = sortedMap.tailMap(hash)
val newHash = if (tailMap.isEmpty) sortedMap.firstKey() else tailMap.firstKey()
sortedMap.get(newHash)
}

def find(key: Array[Byte]): List[MemberAddress] = {
if (sortedMap.isEmpty) return Nil

val hashIdx = hashFn.hash(key)

val candidates = findCandidate(hashIdx)

candidates.getNodes
}

// USED ONLY FOR TESTS
private[partitioner] def shards = sortedMap.values()

}

object CHRWithShards {
def apply(hashFn: Hash, nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = new CHRWithShards(hashFn, primaryPartitionsPerNode, replicationFactor).init(nodes)
def apply(nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = apply(SuuchiHash, nodes, primaryPartitionsPerNode, replicationFactor)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package in.ashwanthkumar.suuchi.partitioner

import in.ashwanthkumar.suuchi.cluster.MemberAddress

import scala.collection.mutable

case class WeightedNode(node: MemberAddress, weight: Int) extends Ordered[WeightedNode] {
override def compare(that: WeightedNode): Int = this.weight.compareTo(that.weight)
}

class NodePriority {
private val queue = mutable.PriorityQueue.empty[WeightedNode]
private val initialWeight = Integer.MAX_VALUE

def init(nodes: List[MemberAddress]) = {
nodes.foreach(node => queue.enqueue(WeightedNode(node, initialWeight)))
}

def add(node: MemberAddress) = {
queue.enqueue(WeightedNode(node, initialWeight))
}

def add(weightedNode: WeightedNode) = {
queue.enqueue(weightedNode)
}

def dePrioritize(weightedNode: WeightedNode) = {
queue.enqueue(WeightedNode(weightedNode.node, weightedNode.weight - 1))
}

def allFollowers(number: Int, currentNode: MemberAddress, followers: List[MemberAddress], adjustQueue: List[WeightedNode]): List[MemberAddress] = {
if (number == 0) {
adjustQueue.foreach(add)
followers
}
else {
val weightedNode = queue.dequeue()
if (weightedNode.node.equals(currentNode) || followers.contains(weightedNode.node)) {
allFollowers(number, currentNode, followers, weightedNode :: adjustQueue)
}
else {
dePrioritize(weightedNode)
allFollowers(number - 1, currentNode, weightedNode.node :: followers, adjustQueue)
}
}
}

def followers(numberOfFollowers: Int, node: MemberAddress) = allFollowers(numberOfFollowers, node, List(), List())

def values() = queue.toList

def dequeue() = queue.dequeue()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package in.ashwanthkumar.suuchi.partitioner

import in.ashwanthkumar.suuchi.cluster.MemberAddress
import in.ashwanthkumar.suuchi.utils.DistinctMatcher
import org.scalatest.FlatSpec
import org.scalatest.Matchers._

import scala.collection.JavaConversions._

class CHRWithShardsTest extends FlatSpec {

val distinct = new DistinctMatcher[MemberAddress]

def printRing(ring: CHRWithShards)= ring.shards.toList.sortBy(_.id).foreach(s => println(s"Id: ${s.id} Key: ${s.key} Primary: ${s.primaryPartition.node.toExternalForm} Followers: ${s.followers.map(_.node.toExternalForm)}"))

"CHRWithShards" should "pin nodes into the ring accounting for shards" in {
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3), MemberAddress("host4", 4), MemberAddress("host5", 5))
val ring = CHRWithShards(nodes, primaryPartitionsPerNode = 2, replicationFactor = 3)

ring.shards should have size 10
// printRing(ring)
}

it should "not a node as a follower of itself" in {
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3))
val ring = CHRWithShards(nodes, 1, 3)

val shards = ring.shards.toList
shards.foreach{ shard =>
shard.followers should not contain shard.primaryPartition.node
}
}

it should "have shards with distinct number nodes to replicate data" in {
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3))
val ring = CHRWithShards(nodes, 1, 3)

val shards = ring.shards.toList

shards.foreach{shard =>
shard.getNodes shouldBe distinct
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package in.ashwanthkumar.suuchi.partitioner

import in.ashwanthkumar.suuchi.cluster.MemberAddress
import in.ashwanthkumar.suuchi.utils.DistinctMatcher
import org.scalatest.FlatSpec
import org.scalatest.Matchers._

import scala.collection.mutable

class NodePriorityTest extends FlatSpec {

val distinct = new DistinctMatcher[MemberAddress]

"NodePriority" should "init the queue with nodes" in {
val priorityQueue = new NodePriority()
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3))

priorityQueue.init(nodes)

priorityQueue.values() should have size 3

priorityQueue.add(MemberAddress("host4", 4))

priorityQueue.values() should have size 4

priorityQueue.add(WeightedNode(MemberAddress("host5", 5), 10))

priorityQueue.values() should have size 5
}

it should "decrease a node's weight when it is de-prioritized" in {
val priorityQueue = new NodePriority()
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3))

priorityQueue.init(nodes)
val weightedNode = priorityQueue.dequeue()

priorityQueue.dePrioritize(weightedNode)

priorityQueue.values().filter(_.node.equals(weightedNode.node)).head.weight should be < weightedNode.weight
}

it should "give nodes in prioritized order when de-queued" in {
val priorityQueue = new NodePriority()
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3))
val leastWeighed = WeightedNode(MemberAddress("host4", 4), 4)

priorityQueue.init(nodes)
val weightedNode = priorityQueue.dequeue()
priorityQueue.dePrioritize(weightedNode)
priorityQueue.add(leastWeighed)

priorityQueue.dequeue().node should not be weightedNode.node
priorityQueue.dequeue().node should not be weightedNode.node
priorityQueue.dequeue().node should be (weightedNode.node)
priorityQueue.dequeue().node should be (leastWeighed.node)
}

it should "return back the requested number of follower nodes for a given node based on priority to support even distribution of nodes" in {
val priorityQueue = new NodePriority()
val initialWeight = 10
(1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add)
val followersAssignedSoFar = mutable.Set[MemberAddress]()

val node1Followers = priorityQueue.followers(2, MemberAddress("host1", 1)).toSet
node1Followers should have size 2
followersAssignedSoFar.intersect(node1Followers) shouldBe empty
followersAssignedSoFar ++= node1Followers

val node2Followers = priorityQueue.followers(2, MemberAddress("host2", 2)).toSet
node2Followers should have size 2
followersAssignedSoFar.intersect(node2Followers) shouldBe empty
followersAssignedSoFar ++= node2Followers

val node3Followers = priorityQueue.followers(2, MemberAddress("host3", 3)).toSet
node3Followers should have size 2
followersAssignedSoFar.intersect(node3Followers) shouldBe empty
followersAssignedSoFar ++= node3Followers

val node4Followers = priorityQueue.followers(2, MemberAddress("host4", 4)).toSet
node4Followers should have size 2
followersAssignedSoFar.intersect(node4Followers) shouldBe empty
followersAssignedSoFar ++= node4Followers

val node5Followers = priorityQueue.followers(2, MemberAddress("host5", 5)).toSet
node5Followers should have size 2
followersAssignedSoFar.intersect(node5Followers) shouldBe empty
followersAssignedSoFar ++= node5Followers

followersAssignedSoFar should have size 10
}

it should "not return a node as a follower of itself" in {
val priorityQueue = new NodePriority()
val initialWeight = 10
(1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add)

priorityQueue.followers(9, MemberAddress("host1", 1)) shouldNot contain (MemberAddress("host1", 1))
priorityQueue.followers(9, MemberAddress("host5", 5)) shouldNot contain (MemberAddress("host5", 5))
priorityQueue.followers(9, MemberAddress("host10", 10)) shouldNot contain (MemberAddress("host10", 10))
}

it should "followers should be distinct, ie, there shouldn't be a duplicate entry in the result even though the next de-queued item has the highest priority" in {
val priorityQueue = new NodePriority()
val initialWeight = 10
(1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add)
priorityQueue.followers(3, MemberAddress("host1", 1)) shouldBe distinct
priorityQueue.followers(3, MemberAddress("host2", 2)) shouldBe distinct
priorityQueue.followers(3, MemberAddress("host3", 3)) shouldBe distinct
priorityQueue.followers(3, MemberAddress("host4", 4)) shouldBe distinct
priorityQueue.followers(3, MemberAddress("host5", 5)) shouldBe distinct
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package in.ashwanthkumar.suuchi.utils

import org.scalatest.matchers.{BeMatcher, MatchResult}

class DistinctMatcher[T] extends BeMatcher[List[T]] {
override def apply(left: List[T]): MatchResult = {
MatchResult(left.distinct.size == left.size, "is not distinct", "is distinct")
}
}