-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing the notion of shards as an encapsulation of primary and s… #59
Closed
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
f5bedfa
Introducing the notion of shards as an encapsulation of primary and s…
gsriram7 932513a
Fixing a compilation error in CHRWithShards
gsriram7 8bdc42f
Removing unused variables
gsriram7 282dc1d
Using a priority queue approach to assign followers to shards
gsriram7 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
61 changes: 61 additions & 0 deletions
61
suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShards.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package in.ashwanthkumar.suuchi.partitioner | ||
|
||
import java.util | ||
|
||
import in.ashwanthkumar.suuchi.cluster.MemberAddress | ||
|
||
case class Shard(id: Int, primaryPartition: VNode, followers: List[VNode]) { | ||
def key = id + "_" + primaryPartition.node.host + "_" + primaryPartition.node.port | ||
|
||
def getNodes = primaryPartition.node :: followers.map(_.node) | ||
} | ||
|
||
class CHRWithShards(hashFn: Hash, primaryPartitionsPerNode: Int, replicationFactor: Int = 2) { | ||
private val sortedMap = new util.TreeMap[Integer, Shard]() | ||
private val priority = new NodePriority | ||
|
||
def init(nodes: List[MemberAddress]) = { | ||
priority.init(nodes) | ||
nodes.foreach(node => add(node)) | ||
this | ||
} | ||
|
||
private def hash(shard: Shard): Int = hashFn.hash(shard.key.getBytes) | ||
|
||
def followers(node: MemberAddress): List[VNode] = { | ||
priority.followers(replicationFactor - 1, node).map(address => VNode(address, 1, Follower)) | ||
} | ||
|
||
private def add(node: MemberAddress) = { | ||
(1 to primaryPartitionsPerNode).map(i => Shard(i, VNode(node, 1, Primary), followers(node))).foreach { shard => | ||
sortedMap.put(hash(shard), shard) | ||
} | ||
this | ||
} | ||
|
||
def findCandidate(hash: Integer): Shard = { | ||
if (sortedMap.containsKey(hash)) return sortedMap.get(hash) | ||
val tailMap = sortedMap.tailMap(hash) | ||
val newHash = if (tailMap.isEmpty) sortedMap.firstKey() else tailMap.firstKey() | ||
sortedMap.get(newHash) | ||
} | ||
|
||
def find(key: Array[Byte]): List[MemberAddress] = { | ||
if (sortedMap.isEmpty) return Nil | ||
|
||
val hashIdx = hashFn.hash(key) | ||
|
||
val candidates = findCandidate(hashIdx) | ||
|
||
candidates.getNodes | ||
} | ||
|
||
// USED ONLY FOR TESTS | ||
private[partitioner] def shards = sortedMap.values() | ||
|
||
} | ||
|
||
object CHRWithShards { | ||
def apply(hashFn: Hash, nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = new CHRWithShards(hashFn, primaryPartitionsPerNode, replicationFactor).init(nodes) | ||
def apply(nodes: List[MemberAddress], primaryPartitionsPerNode: Int, replicationFactor: Int): CHRWithShards = apply(SuuchiHash, nodes, primaryPartitionsPerNode, replicationFactor) | ||
} |
54 changes: 54 additions & 0 deletions
54
suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/partitioner/NodePriority.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package in.ashwanthkumar.suuchi.partitioner | ||
|
||
import in.ashwanthkumar.suuchi.cluster.MemberAddress | ||
|
||
import scala.collection.mutable | ||
|
||
case class WeightedNode(node: MemberAddress, weight: Int) extends Ordered[WeightedNode] { | ||
override def compare(that: WeightedNode): Int = this.weight.compareTo(that.weight) | ||
} | ||
|
||
class NodePriority { | ||
private val queue = mutable.PriorityQueue.empty[WeightedNode] | ||
private val initialWeight = Integer.MAX_VALUE | ||
|
||
def init(nodes: List[MemberAddress]) = { | ||
nodes.foreach(node => queue.enqueue(WeightedNode(node, initialWeight))) | ||
} | ||
|
||
def add(node: MemberAddress) = { | ||
queue.enqueue(WeightedNode(node, initialWeight)) | ||
} | ||
|
||
def add(weightedNode: WeightedNode) = { | ||
queue.enqueue(weightedNode) | ||
} | ||
|
||
def dePrioritize(weightedNode: WeightedNode) = { | ||
queue.enqueue(WeightedNode(weightedNode.node, weightedNode.weight - 1)) | ||
} | ||
|
||
def allFollowers(number: Int, currentNode: MemberAddress, followers: List[MemberAddress], adjustQueue: List[WeightedNode]): List[MemberAddress] = { | ||
if (number == 0) { | ||
adjustQueue.foreach(add) | ||
followers | ||
} | ||
else { | ||
val weightedNode = queue.dequeue() | ||
if (weightedNode.node.equals(currentNode) || followers.contains(weightedNode.node)) { | ||
allFollowers(number, currentNode, followers, weightedNode :: adjustQueue) | ||
} | ||
else { | ||
dePrioritize(weightedNode) | ||
allFollowers(number - 1, currentNode, weightedNode.node :: followers, adjustQueue) | ||
} | ||
} | ||
} | ||
|
||
def followers(numberOfFollowers: Int, node: MemberAddress) = allFollowers(numberOfFollowers, node, List(), List()) | ||
|
||
def values() = queue.toList | ||
|
||
def dequeue() = queue.dequeue() | ||
|
||
} |
44 changes: 44 additions & 0 deletions
44
suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/CHRWithShardsTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package in.ashwanthkumar.suuchi.partitioner | ||
|
||
import in.ashwanthkumar.suuchi.cluster.MemberAddress | ||
import in.ashwanthkumar.suuchi.utils.DistinctMatcher | ||
import org.scalatest.FlatSpec | ||
import org.scalatest.Matchers._ | ||
|
||
import scala.collection.JavaConversions._ | ||
|
||
class CHRWithShardsTest extends FlatSpec { | ||
|
||
val distinct = new DistinctMatcher[MemberAddress] | ||
|
||
def printRing(ring: CHRWithShards)= ring.shards.toList.sortBy(_.id).foreach(s => println(s"Id: ${s.id} Key: ${s.key} Primary: ${s.primaryPartition.node.toExternalForm} Followers: ${s.followers.map(_.node.toExternalForm)}")) | ||
|
||
"CHRWithShards" should "pin nodes into the ring accounting for shards" in { | ||
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3), MemberAddress("host4", 4), MemberAddress("host5", 5)) | ||
val ring = CHRWithShards(nodes, primaryPartitionsPerNode = 2, replicationFactor = 3) | ||
|
||
ring.shards should have size 10 | ||
// printRing(ring) | ||
} | ||
|
||
it should "not a node as a follower of itself" in { | ||
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) | ||
val ring = CHRWithShards(nodes, 1, 3) | ||
|
||
val shards = ring.shards.toList | ||
shards.foreach{ shard => | ||
shard.followers should not contain shard.primaryPartition.node | ||
} | ||
} | ||
|
||
it should "have shards with distinct number nodes to replicate data" in { | ||
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) | ||
val ring = CHRWithShards(nodes, 1, 3) | ||
|
||
val shards = ring.shards.toList | ||
|
||
shards.foreach{shard => | ||
shard.getNodes shouldBe distinct | ||
} | ||
} | ||
} |
114 changes: 114 additions & 0 deletions
114
suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/partitioner/NodePriorityTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package in.ashwanthkumar.suuchi.partitioner | ||
|
||
import in.ashwanthkumar.suuchi.cluster.MemberAddress | ||
import in.ashwanthkumar.suuchi.utils.DistinctMatcher | ||
import org.scalatest.FlatSpec | ||
import org.scalatest.Matchers._ | ||
|
||
import scala.collection.mutable | ||
|
||
class NodePriorityTest extends FlatSpec { | ||
|
||
val distinct = new DistinctMatcher[MemberAddress] | ||
|
||
"NodePriority" should "init the queue with nodes" in { | ||
val priorityQueue = new NodePriority() | ||
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) | ||
|
||
priorityQueue.init(nodes) | ||
|
||
priorityQueue.values() should have size 3 | ||
|
||
priorityQueue.add(MemberAddress("host4", 4)) | ||
|
||
priorityQueue.values() should have size 4 | ||
|
||
priorityQueue.add(WeightedNode(MemberAddress("host5", 5), 10)) | ||
|
||
priorityQueue.values() should have size 5 | ||
} | ||
|
||
it should "decrease a node's weight when it is de-prioritized" in { | ||
val priorityQueue = new NodePriority() | ||
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) | ||
|
||
priorityQueue.init(nodes) | ||
val weightedNode = priorityQueue.dequeue() | ||
|
||
priorityQueue.dePrioritize(weightedNode) | ||
|
||
priorityQueue.values().filter(_.node.equals(weightedNode.node)).head.weight should be < weightedNode.weight | ||
} | ||
|
||
it should "give nodes in prioritized order when de-queued" in { | ||
val priorityQueue = new NodePriority() | ||
val nodes = List(MemberAddress("host1", 1), MemberAddress("host2", 2), MemberAddress("host3", 3)) | ||
val leastWeighed = WeightedNode(MemberAddress("host4", 4), 4) | ||
|
||
priorityQueue.init(nodes) | ||
val weightedNode = priorityQueue.dequeue() | ||
priorityQueue.dePrioritize(weightedNode) | ||
priorityQueue.add(leastWeighed) | ||
|
||
priorityQueue.dequeue().node should not be weightedNode.node | ||
priorityQueue.dequeue().node should not be weightedNode.node | ||
priorityQueue.dequeue().node should be (weightedNode.node) | ||
priorityQueue.dequeue().node should be (leastWeighed.node) | ||
} | ||
|
||
it should "return back the requested number of follower nodes for a given node based on priority to support even distribution of nodes" in { | ||
val priorityQueue = new NodePriority() | ||
val initialWeight = 10 | ||
(1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) | ||
val followersAssignedSoFar = mutable.Set[MemberAddress]() | ||
|
||
val node1Followers = priorityQueue.followers(2, MemberAddress("host1", 1)).toSet | ||
node1Followers should have size 2 | ||
followersAssignedSoFar.intersect(node1Followers) shouldBe empty | ||
followersAssignedSoFar ++= node1Followers | ||
|
||
val node2Followers = priorityQueue.followers(2, MemberAddress("host2", 2)).toSet | ||
node2Followers should have size 2 | ||
followersAssignedSoFar.intersect(node2Followers) shouldBe empty | ||
followersAssignedSoFar ++= node2Followers | ||
|
||
val node3Followers = priorityQueue.followers(2, MemberAddress("host3", 3)).toSet | ||
node3Followers should have size 2 | ||
followersAssignedSoFar.intersect(node3Followers) shouldBe empty | ||
followersAssignedSoFar ++= node3Followers | ||
|
||
val node4Followers = priorityQueue.followers(2, MemberAddress("host4", 4)).toSet | ||
node4Followers should have size 2 | ||
followersAssignedSoFar.intersect(node4Followers) shouldBe empty | ||
followersAssignedSoFar ++= node4Followers | ||
|
||
val node5Followers = priorityQueue.followers(2, MemberAddress("host5", 5)).toSet | ||
node5Followers should have size 2 | ||
followersAssignedSoFar.intersect(node5Followers) shouldBe empty | ||
followersAssignedSoFar ++= node5Followers | ||
|
||
followersAssignedSoFar should have size 10 | ||
} | ||
|
||
it should "not return a node as a follower of itself" in { | ||
val priorityQueue = new NodePriority() | ||
val initialWeight = 10 | ||
(1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) | ||
|
||
priorityQueue.followers(9, MemberAddress("host1", 1)) shouldNot contain (MemberAddress("host1", 1)) | ||
priorityQueue.followers(9, MemberAddress("host5", 5)) shouldNot contain (MemberAddress("host5", 5)) | ||
priorityQueue.followers(9, MemberAddress("host10", 10)) shouldNot contain (MemberAddress("host10", 10)) | ||
} | ||
|
||
it should "followers should be distinct, ie, there shouldn't be a duplicate entry in the result even though the next de-queued item has the highest priority" in { | ||
val priorityQueue = new NodePriority() | ||
val initialWeight = 10 | ||
(1 to 10).map(i => WeightedNode(MemberAddress(s"host$i", i), initialWeight)).foreach(priorityQueue.add) | ||
priorityQueue.followers(3, MemberAddress("host1", 1)) shouldBe distinct | ||
priorityQueue.followers(3, MemberAddress("host2", 2)) shouldBe distinct | ||
priorityQueue.followers(3, MemberAddress("host3", 3)) shouldBe distinct | ||
priorityQueue.followers(3, MemberAddress("host4", 4)) shouldBe distinct | ||
priorityQueue.followers(3, MemberAddress("host5", 5)) shouldBe distinct | ||
} | ||
|
||
} |
9 changes: 9 additions & 0 deletions
9
suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/utils/DistinctMatcher.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package in.ashwanthkumar.suuchi.utils | ||
|
||
import org.scalatest.matchers.{BeMatcher, MatchResult} | ||
|
||
class DistinctMatcher[T] extends BeMatcher[List[T]] { | ||
override def apply(left: List[T]): MatchResult = { | ||
MatchResult(left.distinct.size == left.size, "is not distinct", "is distinct") | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move all
private
methods to the bottom of the class.