Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ash/Selva - Added full and prefix scan support to all the stores #61

Merged
merged 9 commits into from
Dec 19, 2016
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package in.ashwanthkumar.suuchi.store

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.{Arrays => JArrays}
import java.util.concurrent.ConcurrentSkipListMap

import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._

class InMemoryStore extends Store {
private val log = LoggerFactory.getLogger(getClass)
private val store = new ConcurrentHashMap[ByteBuffer, Array[Byte]]()
private val store = new ConcurrentSkipListMap[ByteBuffer, Array[Byte]]()

override def put(key: Array[Byte], value: Array[Byte]): Boolean = {
log.trace(s"Put with key=${new String(key)}, value=${new String(value)}")
store.put(ByteBuffer.wrap(key), value)
true
}

override def get(key: Array[Byte]): Option[Array[Byte]] = {
log.trace(s"Get with key=${new String(key)}")
val value = Option(store.get(ByteBuffer.wrap(key)))
Expand All @@ -26,4 +30,19 @@ class InMemoryStore extends Store {
store.remove(ByteBuffer.wrap(key))
true
}

override def scan(): Iterator[KV] = {
store.entrySet().map(kv => KV(kv.getKey.array(), kv.getValue)).iterator
}

private def hasPrefix(key: Array[Byte], prefix: Array[Byte]) = {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsriram7 This looks identical to one on RocksDBStore. Can you see if you can extract this out as a reusable method inside an Object ?

key.length >= prefix.length && JArrays.equals(key.take(prefix.length), prefix)
}

override def scan(prefix: Array[Byte]): Iterator[KV] = {
store.tailMap(ByteBuffer.wrap(prefix))
.takeWhile{case (k, v) => hasPrefix(k.array(), prefix)}
.map{case (k, v) => KV(k.array(), v)}
.iterator
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import java.util.concurrent.ConcurrentHashMap
import in.ashwanthkumar.suuchi.partitioner.Hash
import in.ashwanthkumar.suuchi.utils.Logging

import scala.collection.JavaConversions._

/**
* SharedStore shards the keys equally into [[partitionsPerNode]] stores and proxies store operations
* against them for a given key.
Expand Down Expand Up @@ -50,4 +52,8 @@ class ShardedStore(partitionsPerNode: Int, hashFn: Hash, createStore: (Int) => S
}
}
}

override def scan(): Iterator[KV] = map.values().flatMap(_.scan()).iterator

override def scan(prefix: Array[Byte]): Iterator[KV] = map.values().flatMap(_.scan(prefix)).iterator
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package in.ashwanthkumar.suuchi.store

import java.util

trait ReadStore {
def get(key: Array[Byte]): Option[Array[Byte]]
}
Expand All @@ -9,4 +11,17 @@ trait WriteStore {
def remove(key: Array[Byte]): Boolean
}

trait Store extends ReadStore with WriteStore
case class KV(key: Array[Byte], value: Array[Byte]) {
override def equals(obj: scala.Any): Boolean = obj match {
case o: KV => util.Arrays.equals(key, o.key) && util.Arrays.equals(value, o.value)
case _ => false
}

override def toString: String = new String(key) + " " + new String(value)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsriram7 If we're not using this - let's remove this method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I just added it to debug

}
trait Scannable {
def scan(): Iterator[KV]
def scan(prefix: Array[Byte]): Iterator[KV]
}

trait Store extends ReadStore with WriteStore with Scannable
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package in.ashwanthkumar.suuchi.store

import java.util

import in.ashwanthkumar.suuchi.utils.DateUtils
import scala.util.hashing.MurmurHash3

import scala.util.hashing.MurmurHash3
import scala.language.postfixOps


Expand Down Expand Up @@ -33,6 +35,7 @@ object VersionedStore {
val VERSION_PREFIX = "V_".getBytes
val DATA_PREFIX = "D_".getBytes

def isDataKey(key: Array[Byte]) = util.Arrays.equals(key.take(DATA_PREFIX.length), DATA_PREFIX)
def vkey(key: Array[Byte]) = VERSION_PREFIX ++ key
def dkey(key: Array[Byte], version: Array[Byte]): Array[Byte] = DATA_PREFIX ++ key ++ version
def dkey(key: Array[Byte], version: Long): Array[Byte] = DATA_PREFIX ++ key ++ PrimitivesSerDeUtils.longToBytes(version)
Expand Down Expand Up @@ -107,4 +110,8 @@ class VersionedStore(store: Store, versionedBy: VersionedBy, numVersions: Int, c

private def removeData(key: Array[Byte], version: Long) = store.remove(dkey(key, version))
private def removeVersion(key: Array[Byte]) = store.remove(vkey(key))

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Selva - we talked about adding a new method scanVersions (with 2 overloaded variants like scan) that does prefix based on VersionedStore.vkey in VersionedStore. I find that missing. Can you please add that too?

We don't have an immediate need for it but that goes with symmetry for VersionedStore given we can scan both just for versions and for records.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes then we agreed that scan should blindly give away all the versions right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh it's the version store. Yes, I ll add an overloaded method.

override def scan(): Iterator[KV] = store.scan().filter(kv => VersionedStore.isDataKey(kv.key))

override def scan(prefix: Array[Byte]): Iterator[KV] = store.scan(prefix).filter(kv => VersionedStore.isDataKey(kv.key))
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,43 @@
package in.ashwanthkumar.suuchi.store

import org.scalatest.FlatSpec
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper}
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, have, startWith}

class InMemoryStoreTest extends FlatSpec {
"InMemoryStore" should "support get and put on a KV" in {
val store = new InMemoryStore
store.put("1".getBytes, "2".getBytes) should be(true)
store.get("1".getBytes).map(v => new String(v)) should be(Some("2"))
}

it should "support full store scan" in {
val store = new InMemoryStore
store.put("1".getBytes, "one".getBytes)
store.put("2".getBytes, "two".getBytes)
store.put("3".getBytes, "three".getBytes)
store.put("4".getBytes, "four".getBytes)
store.put("5".getBytes, "five".getBytes)

val kVs = store.scan().toList
kVs should have size 5
kVs.sortBy(kv => new String(kv.key)) should be(List(kv("1", "one"), kv("2", "two"), kv("3", "three"), kv("4", "four"), kv("5", "five")))
}

it should "support prefix scan" in {
val store = new InMemoryStore
store.put("prefix1/1".getBytes, "one".getBytes)
store.put("prefix1/2".getBytes, "two".getBytes)
store.put("prefix1/3".getBytes, "three".getBytes)
store.put("prefix2/1".getBytes, "eleven".getBytes)
store.put("prefix2/2".getBytes, "twelve".getBytes)
store.put("prefix2/3".getBytes, "thirteen".getBytes)

val kVs = store.scan("prefix1".getBytes).toList

kVs.foreach{kv =>
new String(kv.key) should startWith("prefix1")
}
}

def kv(key: String, value: String) = KV(key.getBytes, value.getBytes)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package in.ashwanthkumar.suuchi.store.rocksdb

import in.ashwanthkumar.suuchi.store.Store
import java.util.{Arrays => JArrays}

import in.ashwanthkumar.suuchi.store.{KV, Store}
import in.ashwanthkumar.suuchi.utils.Logging
import org.rocksdb._

Expand Down Expand Up @@ -31,4 +33,26 @@ class RocksDbStore(config: RocksDbConfiguration) extends Store with Logging {
override def remove(key: Array[Byte]): Boolean = {
logOnError(() => db.remove(key)) isSuccess
}

def scan(): Iterator[KV] = scan(Array.ofDim[Byte](0))

def scan(prefix: Array[Byte]): Iterator[KV] = {
val rocksIterator: RocksIterator = db.newIterator()
rocksIterator.seek(prefix)

val iterator = new Iterator[KV] {
override def hasNext: Boolean = rocksIterator.isValid && hasPrefix(rocksIterator.key(), prefix)
override def next(): KV = {
val kv = KV(rocksIterator.key(), rocksIterator.value())
rocksIterator.next()
kv
}

private def hasPrefix(key: Array[Byte], prefix: Array[Byte]) = {
key.length >= prefix.length && JArrays.equals(key.take(prefix.length), prefix)
}
}
iterator
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsriram7 See if we can inline this variable.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,56 @@ package in.ashwanthkumar.suuchi.store.rocksdb

import java.io.File

import in.ashwanthkumar.suuchi.store.KV
import org.apache.commons.io.FileUtils
import org.scalatest.Matchers._
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, have, startWith}
import org.scalatest.{BeforeAndAfter, FlatSpec}

class RocksDbStoreSpec extends FlatSpec with BeforeAndAfter {
val ROCKSDB_TEST_LOCATION = "/tmp/suuchi-rocks-test"

val ROCKSDB_TEST_LOCATION = "/tmp/suuchi-rocks-test/"
Copy link
Owner Author

@ashwanthkumar ashwanthkumar Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsriram7 Instead of using this hard-coded path see if we can use the Files.createTempDirectory. This would ensure we automatically choose a temp directory based on the platform.


var db: RocksDbStore = _

before {
FileUtils.deleteDirectory(new File(ROCKSDB_TEST_LOCATION))
db = new RocksDbStore(RocksDbConfiguration.apply(ROCKSDB_TEST_LOCATION))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsriram7 I thought we reverted this change - do we really need to extract this out as a var db and use it? Else was it breaking the tests?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was breaking because of the LOCK.

}

after {
FileUtils.deleteDirectory(new File(ROCKSDB_TEST_LOCATION))
db.close()
}

"RocksDb" should "store & retrieve results properly" in {
val db = new RocksDbStore(RocksDbConfiguration.apply(ROCKSDB_TEST_LOCATION))
(1 to 100).foreach { i =>
db.put(Array(i toByte), Array(i*2 toByte))
}

(1 to 100).foreach { i =>
db.get(Array(i toByte)).get.head should be(i*2 toByte)
}
}

it should "support full db scan" in {
val inputKVs = (1 to 100).map(i => (Array(i toByte), Array(i*2 toByte)))

inputKVs.foreach{case (k, v) => db.put(k, v)}
val scannedResult = db.scan().toList

scannedResult should have size 100
scannedResult.sortBy(kv => new String(kv.key)) should be(inputKVs.map{case (k,v) => KV(k, v)}.toList)
}

it should "support prefix scan" in {
val kVs = (1 to 100).flatMap(i => List((s"prefix1/$i".getBytes, Array(i toByte)), (s"prefix2/$i".getBytes, Array(i * 2 toByte))))

kVs.foreach{case (k, v) => db.put(k, v)}
val scannedResult = db.scan("prefix1".getBytes).toList

scannedResult should have size 100
scannedResult.foreach{ kv =>
new String(kv.key) should startWith("prefix1")
}
}
}