-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ash/Selva - Added full and prefix scan support to all the stores #61
Changes from 1 commit
075fdfb
342be3b
66cffc6
232fed9
afb3cc0
728d70c
82a53f5
99e6451
c433ab7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
package in.ashwanthkumar.suuchi.store | ||
|
||
import java.util | ||
|
||
trait ReadStore { | ||
def get(key: Array[Byte]): Option[Array[Byte]] | ||
} | ||
|
@@ -9,4 +11,17 @@ trait WriteStore { | |
def remove(key: Array[Byte]): Boolean | ||
} | ||
|
||
trait Store extends ReadStore with WriteStore | ||
case class KV(key: Array[Byte], value: Array[Byte]) { | ||
override def equals(obj: scala.Any): Boolean = obj match { | ||
case o: KV => util.Arrays.equals(key, o.key) && util.Arrays.equals(value, o.value) | ||
case _ => false | ||
} | ||
|
||
override def toString: String = new String(key) + " " + new String(value) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gsriram7 If we're not using this - let's remove this method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I just added it to debug |
||
} | ||
trait Scannable { | ||
def scan(): Iterator[KV] | ||
def scan(prefix: Array[Byte]): Iterator[KV] | ||
} | ||
|
||
trait Store extends ReadStore with WriteStore with Scannable |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
package in.ashwanthkumar.suuchi.store | ||
|
||
import java.util | ||
|
||
import in.ashwanthkumar.suuchi.utils.DateUtils | ||
import scala.util.hashing.MurmurHash3 | ||
|
||
import scala.util.hashing.MurmurHash3 | ||
import scala.language.postfixOps | ||
|
||
|
||
|
@@ -33,6 +35,7 @@ object VersionedStore { | |
val VERSION_PREFIX = "V_".getBytes | ||
val DATA_PREFIX = "D_".getBytes | ||
|
||
def isDataKey(key: Array[Byte]) = util.Arrays.equals(key.take(DATA_PREFIX.length), DATA_PREFIX) | ||
def vkey(key: Array[Byte]) = VERSION_PREFIX ++ key | ||
def dkey(key: Array[Byte], version: Array[Byte]): Array[Byte] = DATA_PREFIX ++ key ++ version | ||
def dkey(key: Array[Byte], version: Long): Array[Byte] = DATA_PREFIX ++ key ++ PrimitivesSerDeUtils.longToBytes(version) | ||
|
@@ -107,4 +110,8 @@ class VersionedStore(store: Store, versionedBy: VersionedBy, numVersions: Int, c | |
|
||
private def removeData(key: Array[Byte], version: Long) = store.remove(dkey(key, version)) | ||
private def removeVersion(key: Array[Byte]) = store.remove(vkey(key)) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Selva - we talked about adding a new method We don't have an immediate need for it but that goes with symmetry for VersionedStore given we can scan both just for versions and for records. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes then we agreed that scan should blindly give away all the versions right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohh it's the version store. Yes, I ll add an overloaded method. |
||
override def scan(): Iterator[KV] = store.scan().filter(kv => VersionedStore.isDataKey(kv.key)) | ||
|
||
override def scan(prefix: Array[Byte]): Iterator[KV] = store.scan(prefix).filter(kv => VersionedStore.isDataKey(kv.key)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,43 @@ | ||
package in.ashwanthkumar.suuchi.store | ||
|
||
import org.scalatest.FlatSpec | ||
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper} | ||
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, have, startWith} | ||
|
||
class InMemoryStoreTest extends FlatSpec { | ||
"InMemoryStore" should "support get and put on a KV" in { | ||
val store = new InMemoryStore | ||
store.put("1".getBytes, "2".getBytes) should be(true) | ||
store.get("1".getBytes).map(v => new String(v)) should be(Some("2")) | ||
} | ||
|
||
it should "support full store scan" in { | ||
val store = new InMemoryStore | ||
store.put("1".getBytes, "one".getBytes) | ||
store.put("2".getBytes, "two".getBytes) | ||
store.put("3".getBytes, "three".getBytes) | ||
store.put("4".getBytes, "four".getBytes) | ||
store.put("5".getBytes, "five".getBytes) | ||
|
||
val kVs = store.scan().toList | ||
kVs should have size 5 | ||
kVs.sortBy(kv => new String(kv.key)) should be(List(kv("1", "one"), kv("2", "two"), kv("3", "three"), kv("4", "four"), kv("5", "five"))) | ||
} | ||
|
||
it should "support prefix scan" in { | ||
val store = new InMemoryStore | ||
store.put("prefix1/1".getBytes, "one".getBytes) | ||
store.put("prefix1/2".getBytes, "two".getBytes) | ||
store.put("prefix1/3".getBytes, "three".getBytes) | ||
store.put("prefix2/1".getBytes, "eleven".getBytes) | ||
store.put("prefix2/2".getBytes, "twelve".getBytes) | ||
store.put("prefix2/3".getBytes, "thirteen".getBytes) | ||
|
||
val kVs = store.scan("prefix1".getBytes).toList | ||
|
||
kVs.foreach{kv => | ||
new String(kv.key) should startWith("prefix1") | ||
} | ||
} | ||
|
||
def kv(key: String, value: String) = KV(key.getBytes, value.getBytes) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
package in.ashwanthkumar.suuchi.store.rocksdb | ||
|
||
import in.ashwanthkumar.suuchi.store.Store | ||
import java.util.{Arrays => JArrays} | ||
|
||
import in.ashwanthkumar.suuchi.store.{KV, Store} | ||
import in.ashwanthkumar.suuchi.utils.Logging | ||
import org.rocksdb._ | ||
|
||
|
@@ -31,4 +33,26 @@ class RocksDbStore(config: RocksDbConfiguration) extends Store with Logging { | |
override def remove(key: Array[Byte]): Boolean = { | ||
logOnError(() => db.remove(key)) isSuccess | ||
} | ||
|
||
def scan(): Iterator[KV] = scan(Array.ofDim[Byte](0)) | ||
|
||
def scan(prefix: Array[Byte]): Iterator[KV] = { | ||
val rocksIterator: RocksIterator = db.newIterator() | ||
rocksIterator.seek(prefix) | ||
|
||
val iterator = new Iterator[KV] { | ||
override def hasNext: Boolean = rocksIterator.isValid && hasPrefix(rocksIterator.key(), prefix) | ||
override def next(): KV = { | ||
val kv = KV(rocksIterator.key(), rocksIterator.value()) | ||
rocksIterator.next() | ||
kv | ||
} | ||
|
||
private def hasPrefix(key: Array[Byte], prefix: Array[Byte]) = { | ||
key.length >= prefix.length && JArrays.equals(key.take(prefix.length), prefix) | ||
} | ||
} | ||
iterator | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gsriram7 See if we can inline this variable. |
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,29 +2,56 @@ package in.ashwanthkumar.suuchi.store.rocksdb | |
|
||
import java.io.File | ||
|
||
import in.ashwanthkumar.suuchi.store.KV | ||
import org.apache.commons.io.FileUtils | ||
import org.scalatest.Matchers._ | ||
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, have, startWith} | ||
import org.scalatest.{BeforeAndAfter, FlatSpec} | ||
|
||
class RocksDbStoreSpec extends FlatSpec with BeforeAndAfter { | ||
val ROCKSDB_TEST_LOCATION = "/tmp/suuchi-rocks-test" | ||
|
||
val ROCKSDB_TEST_LOCATION = "/tmp/suuchi-rocks-test/" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gsriram7 Instead of using this hard-coded path see if we can use the |
||
|
||
var db: RocksDbStore = _ | ||
|
||
before { | ||
FileUtils.deleteDirectory(new File(ROCKSDB_TEST_LOCATION)) | ||
db = new RocksDbStore(RocksDbConfiguration.apply(ROCKSDB_TEST_LOCATION)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gsriram7 I thought we reverted this change - do we really need to extract this out as a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it was breaking because of the LOCK. |
||
} | ||
|
||
after { | ||
FileUtils.deleteDirectory(new File(ROCKSDB_TEST_LOCATION)) | ||
db.close() | ||
} | ||
|
||
"RocksDb" should "store & retrieve results properly" in { | ||
val db = new RocksDbStore(RocksDbConfiguration.apply(ROCKSDB_TEST_LOCATION)) | ||
(1 to 100).foreach { i => | ||
db.put(Array(i toByte), Array(i*2 toByte)) | ||
} | ||
|
||
(1 to 100).foreach { i => | ||
db.get(Array(i toByte)).get.head should be(i*2 toByte) | ||
} | ||
} | ||
|
||
it should "support full db scan" in { | ||
val inputKVs = (1 to 100).map(i => (Array(i toByte), Array(i*2 toByte))) | ||
|
||
inputKVs.foreach{case (k, v) => db.put(k, v)} | ||
val scannedResult = db.scan().toList | ||
|
||
scannedResult should have size 100 | ||
scannedResult.sortBy(kv => new String(kv.key)) should be(inputKVs.map{case (k,v) => KV(k, v)}.toList) | ||
} | ||
|
||
it should "support prefix scan" in { | ||
val kVs = (1 to 100).flatMap(i => List((s"prefix1/$i".getBytes, Array(i toByte)), (s"prefix2/$i".getBytes, Array(i * 2 toByte)))) | ||
|
||
kVs.foreach{case (k, v) => db.put(k, v)} | ||
val scannedResult = db.scan("prefix1".getBytes).toList | ||
|
||
scannedResult should have size 100 | ||
scannedResult.foreach{ kv => | ||
new String(kv.key) should startWith("prefix1") | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 This looks identical to one on RocksDBStore. Can you see if you can extract this out as a reusable method inside an
Object
?